Skip to content

Commit 6555c90

Browse files
committed
simplify ArrowBufReader decoding logic
1 parent 427fb5b commit 6555c90

1 file changed

Lines changed: 18 additions & 73 deletions

File tree

flight/flight-core/src/main/java/org/apache/arrow/flight/FlightDataParser.java

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -116,31 +116,32 @@ private void parseFields() throws IOException {
116116
if (tag == -1) {
117117
break;
118118
}
119+
int size = readLength();
119120
switch (tag) {
120121
case DESCRIPTOR_TAG:
121122
{
122-
int size = readLength();
123123
byte[] bytes = readBytes(size);
124124
descriptor = FlightDescriptor.parseFrom(bytes);
125125
break;
126126
}
127127
case HEADER_TAG:
128128
{
129-
int size = readLength();
130129
byte[] bytes = readBytes(size);
131130
header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
132131
break;
133132
}
134133
case APP_METADATA_TAG:
135134
{
136-
int size = readLength();
135+
// Called before reading a new value to handle duplicate protobuf fields
136+
// (last occurrence wins per spec) and prevent memory leaks.
137137
closeAppMetadata();
138138
appMetadata = readBuffer(size);
139139
break;
140140
}
141141
case BODY_TAG:
142142
{
143-
int size = readLength();
143+
// Called before reading a new value to handle duplicate protobuf fields
144+
// (last occurrence wins per spec) and prevent memory leaks.
144145
closeBody();
145146
body = readBuffer(size);
146147
break;
@@ -239,12 +240,13 @@ static final class ArrowBufReader extends FlightDataReader {
239240
private static final Logger LOG = LoggerFactory.getLogger(ArrowBufReader.class);
240241

241242
private final ArrowBuf backingBuffer;
242-
private final ByteBuffer buffer;
243+
private final CodedInputStream codedInput;
243244

244245
ArrowBufReader(BufferAllocator allocator, ArrowBuf backingBuffer) {
245246
super(allocator);
246247
this.backingBuffer = backingBuffer;
247-
this.buffer = backingBuffer.nioBuffer(0, (int) backingBuffer.capacity());
248+
ByteBuffer buffer = backingBuffer.nioBuffer(0, (int) backingBuffer.capacity());
249+
this.codedInput = CodedInputStream.newInstance(buffer);
248250
}
249251

250252
static ArrowBufReader tryArrowBufReader(BufferAllocator allocator, InputStream stream) {
@@ -320,89 +322,32 @@ protected void cleanup() {
320322
}
321323

322324
@Override
323-
protected boolean hasRemaining() {
324-
return buffer.hasRemaining();
325+
protected boolean hasRemaining() throws IOException {
326+
return !codedInput.isAtEnd();
325327
}
326328

327329
@Override
328330
protected int readTag() throws IOException {
329-
if (!buffer.hasRemaining()) {
330-
return -1;
331-
}
332-
int tagFirstByte = buffer.get() & 0xFF;
333-
return readRawVarint32(tagFirstByte);
331+
int tag = codedInput.readTag();
332+
return tag == 0 ? -1 : tag;
334333
}
335334

336335
@Override
337336
protected int readLength() throws IOException {
338-
if (!buffer.hasRemaining()) {
339-
throw new IOException("Unexpected end of buffer");
340-
}
341-
int firstByte = buffer.get() & 0xFF;
342-
return readRawVarint32(firstByte);
343-
}
344-
345-
/**
346-
* Decodes a Base 128 Varint from the ByteBuffer.
347-
*
348-
* <p>This is a manual implementation because CodedInputStream only provides a static helper for
349-
* InputStream, not ByteBuffer. We need direct ByteBuffer access to track positions for
350-
* zero-copy slicing in {@link #readBuffer(int)}.
351-
*
352-
* <p>Varints are a variable-length encoding for integers used by Protocol Buffers. Each byte
353-
* uses 7 bits for data and 1 bit (MSB) as a continuation flag:
354-
*
355-
* <ul>
356-
* <li>MSB = 1: more bytes follow
357-
* <li>MSB = 0: this is the last byte
358-
* </ul>
359-
*
360-
* <p>Bytes are stored in little-endian order (least significant group first).
361-
*
362-
* @see <a href="https://protobuf.dev/programming-guides/encoding/#varints">Protocol Buffers
363-
* Encoding: Varints</a>
364-
*/
365-
private int readRawVarint32(int firstByte) throws IOException {
366-
// Check MSB: if 0, this single byte contains the entire value (0-127)
367-
if ((firstByte & 0x80) == 0) {
368-
return firstByte;
369-
}
370-
// Extract lower 7 bits of first byte as the starting result
371-
int result = firstByte & 0x7F;
372-
// Process continuation bytes, shifting each 7-bit group into position
373-
for (int shift = 7; shift < 32; shift += 7) {
374-
if (!buffer.hasRemaining()) {
375-
throw new IOException("Unexpected end of buffer");
376-
}
377-
int b = buffer.get() & 0xFF;
378-
// OR the 7 data bits into the result at the current shift position
379-
result |= (b & 0x7F) << shift;
380-
// If MSB is 0, we've reached the last byte
381-
if ((b & 0x80) == 0) {
382-
return result;
383-
}
384-
}
385-
// A valid 32-bit varint uses at most 5 bytes (5 * 7 = 35 bits > 32 bits)
386-
throw new IOException("Malformed varint");
337+
return codedInput.readRawVarint32();
387338
}
388339

389340
@Override
390341
protected byte[] readBytes(int size) throws IOException {
391-
if (buffer.remaining() < size) {
392-
throw new IOException("Unexpected end of buffer");
393-
}
394-
byte[] bytes = new byte[size];
395-
buffer.get(bytes);
396-
return bytes;
342+
// Reads size bytes and creates a copy
343+
return codedInput.readRawBytes(size);
397344
}
398345

399346
@Override
400347
protected ArrowBuf readBuffer(int size) throws IOException {
401-
if (buffer.remaining() < size) {
402-
throw new IOException("Unexpected end of buffer");
403-
}
404-
int offset = buffer.position();
405-
buffer.position(offset + size);
348+
// CodedInputStream advances the shared ByteBuffer; use its read count for zero-copy slicing.
349+
int offset = codedInput.getTotalBytesRead();
350+
codedInput.skipRawBytes(size);
406351
backingBuffer.getReferenceManager().retain();
407352
return backingBuffer.slice(offset, size);
408353
}

0 commit comments

Comments
 (0)