5050import org .junit .jupiter .api .Test ;
5151
5252/**
53- * Tests FlightData parsing for duplicate field handling and well-formed messages. Covers both
54- * InputStream (with copying) and ArrowBuf (zero-copy) parsing paths. Verifies that duplicate
55- * protobuf fields use last-occurrence-wins semantics without memory leaks.
53+ * Tests FlightData parsing including duplicate field handling, well-formed messages, and zero-copy
54+ * behavior. Covers both InputStream (with copying) and ArrowBuf (zero-copy) parsing paths. Verifies
55+ * that duplicate protobuf fields use last-occurrence-wins semantics without memory leaks.
5656 */
57- public class TestFlightDataParserDuplicateFields {
57+ public class TestArrowMessageParse {
5858
5959 private BufferAllocator allocator ;
6060
@@ -110,7 +110,7 @@ public void testDuplicateAppMetadataArrowBuf() throws Exception {
110110 List .of (
111111 Pair .of (FlightData .APP_METADATA_FIELD_NUMBER , firstAppMetadata ),
112112 Pair .of (FlightData .APP_METADATA_FIELD_NUMBER , secondAppMetadata )));
113- InputStream stream = new DetachableDirectBufferInputStream (serialized );
113+ InputStream stream = MockGrpcInputStream . ofDirectBuffer (serialized );
114114
115115 try (ArrowMessage message = ArrowMessage .createMarshaller (allocator ).parse (stream )) {
116116 ArrowBuf appMetadata = message .getApplicationMetadata ();
@@ -166,7 +166,7 @@ public void testDuplicateBodyArrowBuf() throws Exception {
166166 List .of (
167167 Pair .of (FlightData .DATA_BODY_FIELD_NUMBER , firstBody ),
168168 Pair .of (FlightData .DATA_BODY_FIELD_NUMBER , secondBody )));
169- InputStream stream = new DetachableDirectBufferInputStream (serialized );
169+ InputStream stream = MockGrpcInputStream . ofDirectBuffer (serialized );
170170
171171 try (ArrowMessage message = ArrowMessage .createMarshaller (allocator ).parse (stream )) {
172172 ArrowBuf body = Iterables .getOnlyElement (message .getBufs ());
@@ -229,7 +229,7 @@ public void testFieldsArrowBuf() throws Exception {
229229 assertEquals (0 , allocator .getAllocatedMemory ());
230230
231231 byte [] serialized = buildFlightDataWithBothFields (appMetadataBytes , bodyBytes );
232- InputStream stream = new DetachableDirectBufferInputStream (serialized );
232+ InputStream stream = MockGrpcInputStream . ofDirectBuffer (serialized );
233233
234234 try (ArrowMessage message = ArrowMessage .createMarshaller (allocator ).parse (stream )) {
235235 // Verify descriptor
@@ -260,6 +260,21 @@ public void testFieldsArrowBuf() throws Exception {
260260 assertEquals (0 , allocator .getAllocatedMemory ());
261261 }
262262
263+ /** Verifies that heap buffers fall back to InputStream path without calling detach(). */
264+ @ Test
265+ public void testHeapBufferFallbackDoesNotDetach () throws Exception {
266+ byte [] appMetadataBytes = new byte [] {8 , 9 };
267+ byte [] bodyBytes = new byte [] {10 , 11 , 12 };
268+
269+ byte [] serialized = buildFlightDataWithBothFields (appMetadataBytes , bodyBytes );
270+ MockGrpcInputStream stream = MockGrpcInputStream .ofHeapBuffer (serialized );
271+
272+ try (ArrowMessage message = ArrowMessage .createMarshaller (allocator ).parse (stream )) {
273+ assertNotNull (message .getDescriptor ());
274+ assertEquals (0 , stream .getDetachCount ());
275+ }
276+ }
277+
263278 // Helper methods to build complete FlightData messages
264279
265280 private FlightDescriptor createTestDescriptor () {
@@ -298,7 +313,8 @@ private byte[] buildFlightDataWithBothFields(byte[] appMetadata, byte[] body) th
298313
299314 // Helper methods to build FlightData messages with duplicate fields
300315
301- private byte [] buildFlightDataDescriptors (List <Pair <Integer , byte []>> descriptors ) throws IOException {
316+ private byte [] buildFlightDataDescriptors (List <Pair <Integer , byte []>> descriptors )
317+ throws IOException {
302318
303319 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
304320 CodedOutputStream cos = CodedOutputStream .newInstance (baos );
@@ -311,34 +327,47 @@ private byte[] buildFlightDataDescriptors(List<Pair<Integer, byte[]>> descriptor
311327 }
312328
313329 /** Mock InputStream implementing gRPC's Detachable and HasByteBuffer for testing zero-copy. */
314- private static class DetachableDirectBufferInputStream extends InputStream
330+ private static class MockGrpcInputStream extends InputStream
315331 implements Detachable , HasByteBuffer {
316332 private ByteBuffer buffer ;
333+ private final boolean byteBufferSupported ;
334+ private int detachCount ;
317335
318- DetachableDirectBufferInputStream ( byte [] data ) {
319- this .buffer = ByteBuffer . allocateDirect ( data . length ) ;
320- this .buffer . put ( data ). flip () ;
336+ private MockGrpcInputStream ( ByteBuffer buffer , boolean byteBufferSupported ) {
337+ this .buffer = buffer ;
338+ this .byteBufferSupported = byteBufferSupported ;
321339 }
322340
323- private DetachableDirectBufferInputStream (ByteBuffer buffer ) {
324- this .buffer = buffer ;
341+ static MockGrpcInputStream ofDirectBuffer (byte [] data ) {
342+ ByteBuffer buf = ByteBuffer .allocateDirect (data .length );
343+ buf .put (data ).flip ();
344+ return new MockGrpcInputStream (buf , true );
345+ }
346+
347+ static MockGrpcInputStream ofHeapBuffer (byte [] data ) {
348+ return new MockGrpcInputStream (ByteBuffer .wrap (data ), true );
325349 }
326350
327351 @ Override
328352 public boolean byteBufferSupported () {
329- return true ;
353+ return byteBufferSupported ;
330354 }
331355
332356 @ Override
333357 public ByteBuffer getByteBuffer () {
334- return buffer ;
358+ return byteBufferSupported ? buffer : null ;
335359 }
336360
337361 @ Override
338362 public InputStream detach () {
363+ detachCount ++;
339364 ByteBuffer detached = this .buffer ;
340365 this .buffer = null ;
341- return new DetachableDirectBufferInputStream (detached );
366+ return new MockGrpcInputStream (detached , byteBufferSupported );
367+ }
368+
369+ int getDetachCount () {
370+ return detachCount ;
342371 }
343372
344373 @ Override
0 commit comments