Skip to content

Commit 427fb5b

Browse files
committed
rework frame to wrap inputstream with ArrowBuf before reading
1 parent 33d9188 commit 427fb5b

3 files changed

Lines changed: 495 additions & 332 deletions

File tree

flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java

Lines changed: 11 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818

1919
import com.google.common.collect.ImmutableList;
2020
import com.google.common.collect.Iterables;
21-
import com.google.common.io.ByteStreams;
2221
import com.google.protobuf.ByteString;
23-
import com.google.protobuf.CodedInputStream;
2422
import com.google.protobuf.CodedOutputStream;
2523
import com.google.protobuf.WireFormat;
26-
import io.grpc.Detachable;
2724
import io.grpc.Drainable;
28-
import io.grpc.HasByteBuffer;
2925
import io.grpc.MethodDescriptor.Marshaller;
3026
import io.grpc.protobuf.ProtoUtils;
3127
import io.netty.buffer.ByteBuf;
@@ -42,13 +38,14 @@
4238
import java.util.Arrays;
4339
import java.util.Collections;
4440
import java.util.List;
41+
import org.apache.arrow.flight.FlightDataParser.ArrowBufReader;
42+
import org.apache.arrow.flight.FlightDataParser.FlightDataReader;
43+
import org.apache.arrow.flight.FlightDataParser.InputStreamReader;
4544
import org.apache.arrow.flight.grpc.AddWritableBuffer;
4645
import org.apache.arrow.flight.impl.Flight.FlightData;
4746
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
4847
import org.apache.arrow.memory.ArrowBuf;
4948
import org.apache.arrow.memory.BufferAllocator;
50-
import org.apache.arrow.memory.ForeignAllocation;
51-
import org.apache.arrow.memory.util.MemoryUtil;
5249
import org.apache.arrow.util.AutoCloseables;
5350
import org.apache.arrow.util.Preconditions;
5451
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
@@ -82,19 +79,10 @@ class ArrowMessage implements AutoCloseable {
8279
if (zeroCopyWriteFlag == null) {
8380
zeroCopyWriteFlag = System.getenv("ARROW_FLIGHT_ENABLE_ZERO_COPY_WRITE");
8481
}
85-
ENABLE_ZERO_COPY_READ = !"false".equalsIgnoreCase(zeroCopyReadFlag);
82+
ENABLE_ZERO_COPY_READ = true; // !"false".equalsIgnoreCase(zeroCopyReadFlag);
8683
ENABLE_ZERO_COPY_WRITE = "true".equalsIgnoreCase(zeroCopyWriteFlag);
8784
}
8885

89-
private static final int DESCRIPTOR_TAG =
90-
(FlightData.FLIGHT_DESCRIPTOR_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
91-
private static final int BODY_TAG =
92-
(FlightData.DATA_BODY_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
93-
private static final int HEADER_TAG =
94-
(FlightData.DATA_HEADER_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
95-
private static final int APP_METADATA_TAG =
96-
(FlightData.APP_METADATA_FIELD_NUMBER << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
97-
9886
private static final Marshaller<FlightData> NO_BODY_MARSHALLER =
9987
ProtoUtils.marshaller(FlightData.getDefaultInstance());
10088

@@ -219,7 +207,7 @@ public ArrowMessage(FlightDescriptor descriptor) {
219207
this.tryZeroCopyWrite = false;
220208
}
221209

222-
private ArrowMessage(
210+
ArrowMessage(
223211
FlightDescriptor descriptor,
224212
MessageMetadataResult message,
225213
ArrowBuf appMetadata,
@@ -287,207 +275,16 @@ public Iterable<ArrowBuf> getBufs() {
287275
}
288276

289277
private static ArrowMessage frame(BufferAllocator allocator, final InputStream stream) {
290-
291-
try {
292-
FlightDescriptor descriptor = null;
293-
MessageMetadataResult header = null;
294-
ArrowBuf body = null;
295-
ArrowBuf appMetadata = null;
296-
while (stream.available() > 0) {
297-
final int tagFirstByte = stream.read();
298-
if (tagFirstByte == -1) {
299-
break;
300-
}
301-
int tag = readRawVarint32(tagFirstByte, stream);
302-
switch (tag) {
303-
case DESCRIPTOR_TAG:
304-
{
305-
int size = readRawVarint32(stream);
306-
byte[] bytes = new byte[size];
307-
ByteStreams.readFully(stream, bytes);
308-
descriptor = FlightDescriptor.parseFrom(bytes);
309-
break;
310-
}
311-
case HEADER_TAG:
312-
{
313-
int size = readRawVarint32(stream);
314-
byte[] bytes = new byte[size];
315-
ByteStreams.readFully(stream, bytes);
316-
header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
317-
break;
318-
}
319-
case APP_METADATA_TAG:
320-
{
321-
int size = readRawVarint32(stream);
322-
appMetadata = readBuffer(allocator, stream, size);
323-
break;
324-
}
325-
case BODY_TAG:
326-
if (body != null) {
327-
// only read last body.
328-
body.getReferenceManager().release();
329-
body = null;
330-
}
331-
int size = readRawVarint32(stream);
332-
body = readBuffer(allocator, stream, size);
333-
break;
334-
335-
default:
336-
// ignore unknown fields.
337-
}
338-
}
339-
// Protobuf implementations can omit empty fields, such as body; for some message types, like
340-
// RecordBatch,
341-
// this will fail later as we still expect an empty buffer. In those cases only, fill in an
342-
// empty buffer here -
343-
// in other cases, like Schema, having an unexpected empty buffer will also cause failures.
344-
// We don't fill in defaults for fields like header, for which there is no reasonable default,
345-
// or for appMetadata
346-
// or descriptor, which are intended to be empty in some cases.
347-
if (header != null) {
348-
switch (HeaderType.getHeader(header.headerType())) {
349-
case SCHEMA:
350-
// Ignore 0-length buffers in case a Protobuf implementation wrote it out
351-
if (body != null && body.capacity() == 0) {
352-
body.close();
353-
body = null;
354-
}
355-
break;
356-
case DICTIONARY_BATCH:
357-
case RECORD_BATCH:
358-
// A Protobuf implementation can skip 0-length bodies, so ensure we fill it in here
359-
if (body == null) {
360-
body = allocator.getEmpty();
361-
}
362-
break;
363-
case NONE:
364-
case TENSOR:
365-
default:
366-
// Do nothing
367-
break;
368-
}
369-
}
370-
return new ArrowMessage(descriptor, header, appMetadata, body);
371-
} catch (Exception ioe) {
372-
throw new RuntimeException(ioe);
373-
}
374-
}
375-
376-
private static int readRawVarint32(InputStream is) throws IOException {
377-
int firstByte = is.read();
378-
return readRawVarint32(firstByte, is);
379-
}
380-
381-
private static int readRawVarint32(int firstByte, InputStream is) throws IOException {
382-
return CodedInputStream.readRawVarint32(firstByte, is);
383-
}
384-
385-
/**
386-
* Reads data from the stream into an ArrowBuf, without copying data when possible.
387-
*
388-
* <p>First attempts to transfer ownership of the gRPC buffer to Arrow via {@link
389-
* #wrapGrpcBuffer}. This avoids any memory copy when the gRPC transport provides a direct
390-
* ByteBuffer (e.g., Netty).
391-
*
392-
* <p>If not possible (e.g., heap buffer, fragmented data, or unsupported transport), falls back
393-
* to allocating a new buffer and copying data into it.
394-
*
395-
* @param allocator The allocator to use for buffer allocation
396-
* @param stream The input stream to read from
397-
* @param size The number of bytes to read
398-
* @return An ArrowBuf containing the data
399-
* @throws IOException if there is an error reading from the stream
400-
*/
401-
private static ArrowBuf readBuffer(BufferAllocator allocator, InputStream stream, int size)
402-
throws IOException {
278+
FlightDataReader reader;
403279
if (ENABLE_ZERO_COPY_READ) {
404-
ArrowBuf zeroCopyBuf = wrapGrpcBuffer(stream, allocator, size);
405-
if (zeroCopyBuf != null) {
406-
return zeroCopyBuf;
280+
reader = ArrowBufReader.tryArrowBufReader(allocator, stream);
281+
if (reader != null) {
282+
return reader.toMessage();
407283
}
408284
}
409285

410-
// Fall back to allocating and copying
411-
ArrowBuf buf = allocator.buffer(size);
412-
byte[] heapBytes = new byte[size];
413-
ByteStreams.readFully(stream, heapBytes);
414-
buf.writeBytes(heapBytes);
415-
buf.writerIndex(size);
416-
return buf;
417-
}
418-
419-
/**
420-
* Attempts to wrap gRPC's buffer as an ArrowBuf without copying.
421-
*
422-
* <p>This method takes ownership of gRPC's underlying buffer via {@link Detachable#detach()} and
423-
* wraps it as an ArrowBuf using {@link BufferAllocator#wrapForeignAllocation}. The gRPC buffer
424-
* will be released when the ArrowBuf is closed.
425-
*
426-
* @param stream The gRPC-provided InputStream
427-
* @param allocator The allocator to use for wrapping the foreign allocation
428-
* @param size The number of bytes to wrap
429-
* @return An ArrowBuf wrapping gRPC's buffer, or {@code null} if zero-copy is not possible
430-
*/
431-
static ArrowBuf wrapGrpcBuffer(
432-
final InputStream stream, final BufferAllocator allocator, final int size) {
433-
434-
if (!(stream instanceof Detachable) || !(stream instanceof HasByteBuffer)) {
435-
return null;
436-
}
437-
438-
HasByteBuffer hasByteBuffer = (HasByteBuffer) stream;
439-
if (!hasByteBuffer.byteBufferSupported()) {
440-
return null;
441-
}
442-
443-
ByteBuffer peekBuffer = hasByteBuffer.getByteBuffer();
444-
if (peekBuffer == null) {
445-
return null;
446-
}
447-
if (!peekBuffer.isDirect()) {
448-
return null;
449-
}
450-
if (peekBuffer.remaining() < size) {
451-
// Data is fragmented across multiple buffers; zero-copy not possible
452-
return null;
453-
}
454-
455-
// Take ownership
456-
InputStream detachedStream = ((Detachable) stream).detach();
457-
458-
// Get buffer from detached stream
459-
ByteBuffer detachedByteBuffer = ((HasByteBuffer) detachedStream).getByteBuffer();
460-
461-
// Calculate memory address accounting for buffer position
462-
long baseAddress = MemoryUtil.getByteBufferAddress(detachedByteBuffer);
463-
long dataAddress = baseAddress + detachedByteBuffer.position();
464-
465-
// Create ForeignAllocation with proper cleanup
466-
ForeignAllocation foreignAllocation =
467-
new ForeignAllocation(size, dataAddress) {
468-
@Override
469-
protected void release0() {
470-
closeQuietly(detachedStream);
471-
}
472-
};
473-
474-
try {
475-
return allocator.wrapForeignAllocation(foreignAllocation);
476-
} catch (Throwable t) {
477-
// If it fails, clean up the detached stream and propagate
478-
closeQuietly(detachedStream);
479-
throw t;
480-
}
481-
}
482-
483-
private static void closeQuietly(InputStream stream) {
484-
if (stream != null) {
485-
try {
486-
stream.close();
487-
} catch (IOException e) {
488-
LOG.debug("Error closing detached gRPC stream", e);
489-
}
490-
}
286+
reader = new InputStreamReader(allocator, stream);
287+
return reader.toMessage();
491288
}
492289

493290
/**

0 commit comments

Comments
 (0)