Skip to content

Commit b06098a

Browse files
committed
add ArrowMessage.frame tests
1 parent 6555c90 commit b06098a

1 file changed

Lines changed: 359 additions & 0 deletions

File tree

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.flight;
18+
19+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
23+
import com.google.common.collect.Iterables;
24+
import com.google.common.io.ByteStreams;
25+
import com.google.protobuf.ByteString;
26+
import com.google.protobuf.CodedOutputStream;
27+
import io.grpc.Detachable;
28+
import io.grpc.HasByteBuffer;
29+
import io.grpc.protobuf.ProtoUtils;
30+
import java.io.ByteArrayInputStream;
31+
import java.io.ByteArrayOutputStream;
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.nio.ByteBuffer;
35+
import java.util.Arrays;
36+
import java.util.List;
37+
import org.apache.arrow.flight.impl.Flight.FlightData;
38+
import org.apache.arrow.flight.impl.Flight.FlightDescriptor;
39+
import org.apache.arrow.memory.ArrowBuf;
40+
import org.apache.arrow.memory.BufferAllocator;
41+
import org.apache.arrow.memory.RootAllocator;
42+
import org.apache.arrow.vector.ipc.message.IpcOption;
43+
import org.apache.arrow.vector.ipc.message.MessageSerializer;
44+
import org.apache.arrow.vector.types.pojo.ArrowType;
45+
import org.apache.arrow.vector.types.pojo.Field;
46+
import org.apache.arrow.vector.types.pojo.Schema;
47+
import org.apache.commons.lang3.tuple.Pair;
48+
import org.junit.jupiter.api.AfterEach;
49+
import org.junit.jupiter.api.BeforeEach;
50+
import org.junit.jupiter.api.Test;
51+
52+
/**
53+
* Tests FlightData parsing for duplicate field handling and well-formed messages. Covers both
54+
* InputStream (with copying) and ArrowBuf (zero-copy) parsing paths. Verifies that duplicate
55+
* protobuf fields use last-occurrence-wins semantics without memory leaks.
56+
*/
57+
public class TestFlightDataParserDuplicateFields {
58+
59+
private BufferAllocator allocator;
60+
61+
@BeforeEach
62+
public void setUp() {
63+
allocator = new RootAllocator(Long.MAX_VALUE);
64+
}
65+
66+
@AfterEach
67+
public void tearDown() {
68+
allocator.close();
69+
}
70+
71+
/** Verifies duplicate app_metadata fields via InputStream path use last-occurrence-wins. */
72+
@Test
73+
public void testDuplicateAppMetadataInputStream() throws Exception {
74+
byte[] firstAppMetadata = new byte[] {1, 2, 3};
75+
byte[] secondAppMetadata = new byte[] {4, 5, 6, 7, 8};
76+
77+
byte[] serialized =
78+
buildFlightDataDescriptors(
79+
List.of(
80+
Pair.of(FlightData.APP_METADATA_FIELD_NUMBER, firstAppMetadata),
81+
Pair.of(FlightData.APP_METADATA_FIELD_NUMBER, secondAppMetadata)));
82+
InputStream stream = new ByteArrayInputStream(serialized);
83+
84+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
85+
ArrowBuf appMetadata = message.getApplicationMetadata();
86+
assertNotNull(appMetadata);
87+
// Use readableBytes() instead of capacity() since allocator may round up
88+
assertEquals(secondAppMetadata.length, appMetadata.readableBytes());
89+
90+
byte[] actual = new byte[secondAppMetadata.length];
91+
appMetadata.getBytes(0, actual);
92+
assertArrayEquals(secondAppMetadata, actual);
93+
}
94+
assertEquals(0, allocator.getAllocatedMemory());
95+
}
96+
97+
/**
98+
* Verifies duplicate app_metadata fields via zero-copy ArrowBuf path use last-occurrence-wins.
99+
*/
100+
@Test
101+
public void testDuplicateAppMetadataArrowBuf() throws Exception {
102+
byte[] firstAppMetadata = new byte[] {1, 2, 3};
103+
byte[] secondAppMetadata = new byte[] {4, 5, 6, 7, 8};
104+
105+
// Verify clean start
106+
assertEquals(0, allocator.getAllocatedMemory());
107+
108+
byte[] serialized =
109+
buildFlightDataDescriptors(
110+
List.of(
111+
Pair.of(FlightData.APP_METADATA_FIELD_NUMBER, firstAppMetadata),
112+
Pair.of(FlightData.APP_METADATA_FIELD_NUMBER, secondAppMetadata)));
113+
InputStream stream = new DetachableDirectBufferInputStream(serialized);
114+
115+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
116+
ArrowBuf appMetadata = message.getApplicationMetadata();
117+
assertNotNull(appMetadata);
118+
assertEquals(secondAppMetadata.length, appMetadata.readableBytes());
119+
120+
byte[] actual = new byte[secondAppMetadata.length];
121+
appMetadata.getBytes(0, actual);
122+
assertArrayEquals(secondAppMetadata, actual);
123+
124+
// Zero-copy: only the backing buffer (serialized message) should be allocated
125+
assertEquals(serialized.length, allocator.getAllocatedMemory());
126+
}
127+
assertEquals(0, allocator.getAllocatedMemory());
128+
}
129+
130+
/** Verifies duplicate body fields via InputStream path use last-occurrence-wins. */
131+
@Test
132+
public void testDuplicateBodyInputStream() throws Exception {
133+
byte[] firstBody = new byte[] {10, 20, 30};
134+
byte[] secondBody = new byte[] {40, 50, 60, 70};
135+
136+
byte[] serialized =
137+
buildFlightDataDescriptors(
138+
List.of(
139+
Pair.of(FlightData.DATA_BODY_FIELD_NUMBER, firstBody),
140+
Pair.of(FlightData.DATA_BODY_FIELD_NUMBER, secondBody)));
141+
InputStream stream = new ByteArrayInputStream(serialized);
142+
143+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
144+
ArrowBuf body = Iterables.getOnlyElement(message.getBufs());
145+
assertNotNull(body);
146+
assertEquals(secondBody.length, body.readableBytes());
147+
148+
byte[] actual = new byte[secondBody.length];
149+
body.getBytes(0, actual);
150+
assertArrayEquals(secondBody, actual);
151+
}
152+
assertEquals(0, allocator.getAllocatedMemory());
153+
}
154+
155+
/** Verifies duplicate body fields via zero-copy ArrowBuf path use last-occurrence-wins. */
156+
@Test
157+
public void testDuplicateBodyArrowBuf() throws Exception {
158+
byte[] firstBody = new byte[] {10, 20, 30};
159+
byte[] secondBody = new byte[] {40, 50, 60, 70};
160+
161+
// Verify clean start
162+
assertEquals(0, allocator.getAllocatedMemory());
163+
164+
byte[] serialized =
165+
buildFlightDataDescriptors(
166+
List.of(
167+
Pair.of(FlightData.DATA_BODY_FIELD_NUMBER, firstBody),
168+
Pair.of(FlightData.DATA_BODY_FIELD_NUMBER, secondBody)));
169+
InputStream stream = new DetachableDirectBufferInputStream(serialized);
170+
171+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
172+
ArrowBuf body = Iterables.getOnlyElement(message.getBufs());
173+
assertNotNull(body);
174+
assertEquals(secondBody.length, body.readableBytes());
175+
176+
byte[] actual = new byte[secondBody.length];
177+
body.getBytes(0, actual);
178+
assertArrayEquals(secondBody, actual);
179+
180+
// Zero-copy: only the backing buffer (serialized message) should be allocated
181+
assertEquals(serialized.length, allocator.getAllocatedMemory());
182+
}
183+
assertEquals(0, allocator.getAllocatedMemory());
184+
}
185+
186+
/** Verifies well-formed FlightData message parsing via InputStream path. */
187+
@Test
188+
public void testFieldsInputStream() throws Exception {
189+
byte[] appMetadataBytes = new byte[] {100, 101, 102};
190+
byte[] bodyBytes = new byte[] {50, 51, 52, 53, 54};
191+
FlightDescriptor expectedDescriptor = createTestDescriptor();
192+
193+
byte[] serialized = buildFlightDataWithBothFields(appMetadataBytes, bodyBytes);
194+
InputStream stream = new ByteArrayInputStream(serialized);
195+
196+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
197+
// Verify descriptor
198+
assertEquals(expectedDescriptor, message.getDescriptor());
199+
200+
// Verify header is present (Schema message type)
201+
assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType());
202+
203+
// Verify app metadata
204+
ArrowBuf appMetadata = message.getApplicationMetadata();
205+
assertNotNull(appMetadata);
206+
assertEquals(appMetadataBytes.length, appMetadata.readableBytes());
207+
byte[] actualAppMetadata = new byte[appMetadataBytes.length];
208+
appMetadata.getBytes(0, actualAppMetadata);
209+
assertArrayEquals(appMetadataBytes, actualAppMetadata);
210+
211+
// Verify body
212+
ArrowBuf body = Iterables.getOnlyElement(message.getBufs());
213+
assertNotNull(body);
214+
assertEquals(bodyBytes.length, body.readableBytes());
215+
byte[] actualBody = new byte[bodyBytes.length];
216+
body.getBytes(0, actualBody);
217+
assertArrayEquals(bodyBytes, actualBody);
218+
}
219+
assertEquals(0, allocator.getAllocatedMemory());
220+
}
221+
222+
/** Verifies well-formed FlightData message parsing via zero-copy ArrowBuf path. */
223+
@Test
224+
public void testFieldsArrowBuf() throws Exception {
225+
byte[] appMetadataBytes = new byte[] {100, 101, 102};
226+
byte[] bodyBytes = new byte[] {50, 51, 52, 53, 54};
227+
FlightDescriptor expectedDescriptor = createTestDescriptor();
228+
229+
assertEquals(0, allocator.getAllocatedMemory());
230+
231+
byte[] serialized = buildFlightDataWithBothFields(appMetadataBytes, bodyBytes);
232+
InputStream stream = new DetachableDirectBufferInputStream(serialized);
233+
234+
try (ArrowMessage message = ArrowMessage.createMarshaller(allocator).parse(stream)) {
235+
// Verify descriptor
236+
assertEquals(expectedDescriptor, message.getDescriptor());
237+
238+
// Verify header is present (Schema message type)
239+
assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType());
240+
241+
// Verify app metadata
242+
ArrowBuf appMetadata = message.getApplicationMetadata();
243+
assertNotNull(appMetadata);
244+
assertEquals(appMetadataBytes.length, appMetadata.readableBytes());
245+
byte[] actualAppMetadata = new byte[appMetadataBytes.length];
246+
appMetadata.getBytes(0, actualAppMetadata);
247+
assertArrayEquals(appMetadataBytes, actualAppMetadata);
248+
249+
// Verify body
250+
ArrowBuf body = Iterables.getOnlyElement(message.getBufs());
251+
assertNotNull(body);
252+
assertEquals(bodyBytes.length, body.readableBytes());
253+
byte[] actualBody = new byte[bodyBytes.length];
254+
body.getBytes(0, actualBody);
255+
assertArrayEquals(bodyBytes, actualBody);
256+
257+
// Zero-copy: only the backing buffer (serialized message) should be allocated
258+
assertEquals(serialized.length, allocator.getAllocatedMemory());
259+
}
260+
assertEquals(0, allocator.getAllocatedMemory());
261+
}
262+
263+
// Helper methods to build complete FlightData messages
264+
265+
private FlightDescriptor createTestDescriptor() {
266+
return FlightDescriptor.newBuilder()
267+
.setType(FlightDescriptor.DescriptorType.PATH)
268+
.addPath("test")
269+
.addPath("path")
270+
.build();
271+
}
272+
273+
private byte[] createSchemaHeader() {
274+
Schema schema =
275+
new Schema(
276+
Arrays.asList(
277+
Field.nullable("id", new ArrowType.Int(32, true)),
278+
Field.nullable("name", new ArrowType.Utf8())));
279+
ByteBuffer headerBuffer = MessageSerializer.serializeMetadata(schema, IpcOption.DEFAULT);
280+
byte[] headerBytes = new byte[headerBuffer.remaining()];
281+
headerBuffer.get(headerBytes);
282+
return headerBytes;
283+
}
284+
285+
private byte[] buildFlightDataWithBothFields(byte[] appMetadata, byte[] body) throws IOException {
286+
FlightData flightData =
287+
FlightData.newBuilder()
288+
.setFlightDescriptor(createTestDescriptor())
289+
.setDataHeader(ByteString.copyFrom(createSchemaHeader()))
290+
.setAppMetadata(ByteString.copyFrom(appMetadata))
291+
.setDataBody(ByteString.copyFrom(body))
292+
.build();
293+
try (InputStream grpcStream =
294+
ProtoUtils.marshaller(FlightData.getDefaultInstance()).stream(flightData)) {
295+
return ByteStreams.toByteArray(grpcStream);
296+
}
297+
}
298+
299+
// Helper methods to build FlightData messages with duplicate fields
300+
301+
private byte[] buildFlightDataDescriptors(List<Pair<Integer, byte[]>> descriptors) throws IOException {
302+
303+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
304+
CodedOutputStream cos = CodedOutputStream.newInstance(baos);
305+
306+
for (Pair<Integer, byte[]> descriptor : descriptors) {
307+
cos.writeBytes(descriptor.getKey(), ByteString.copyFrom(descriptor.getValue()));
308+
}
309+
cos.flush();
310+
return baos.toByteArray();
311+
}
312+
313+
/** Mock InputStream implementing gRPC's Detachable and HasByteBuffer for testing zero-copy. */
314+
private static class DetachableDirectBufferInputStream extends InputStream
315+
implements Detachable, HasByteBuffer {
316+
private ByteBuffer buffer;
317+
318+
DetachableDirectBufferInputStream(byte[] data) {
319+
this.buffer = ByteBuffer.allocateDirect(data.length);
320+
this.buffer.put(data).flip();
321+
}
322+
323+
private DetachableDirectBufferInputStream(ByteBuffer buffer) {
324+
this.buffer = buffer;
325+
}
326+
327+
@Override
328+
public boolean byteBufferSupported() {
329+
return true;
330+
}
331+
332+
@Override
333+
public ByteBuffer getByteBuffer() {
334+
return buffer;
335+
}
336+
337+
@Override
338+
public InputStream detach() {
339+
ByteBuffer detached = this.buffer;
340+
this.buffer = null;
341+
return new DetachableDirectBufferInputStream(detached);
342+
}
343+
344+
@Override
345+
public int read() {
346+
return (buffer != null && buffer.hasRemaining()) ? (buffer.get() & 0xFF) : -1;
347+
}
348+
349+
@Override
350+
public int available() {
351+
return buffer == null ? 0 : buffer.remaining();
352+
}
353+
354+
@Override
355+
public void close() {
356+
buffer = null;
357+
}
358+
}
359+
}

0 commit comments

Comments
 (0)