Skip to content

Commit 33d9188

Browse files
committed
mock ReadableBuffer to not depend on grpc-core
1 parent a6503c2 commit 33d9188

1 file changed

Lines changed: 71 additions & 39 deletions

File tree

flight/flight-core/src/test/java/org/apache/arrow/flight/TestArrowMessageZeroCopy.java

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626

2727
import io.grpc.Detachable;
2828
import io.grpc.HasByteBuffer;
29-
import io.grpc.internal.ReadableBuffer;
30-
import io.grpc.internal.ReadableBuffers;
3129
import java.io.ByteArrayInputStream;
3230
import java.io.IOException;
3331
import java.io.InputStream;
@@ -54,14 +52,6 @@ public void tearDown() {
5452
allocator.close();
5553
}
5654

57-
private static InputStream createGrpcStreamWithDirectBuffer(byte[] data) {
58-
ByteBuffer directBuffer = ByteBuffer.allocateDirect(data.length);
59-
directBuffer.put(data);
60-
directBuffer.flip();
61-
ReadableBuffer readableBuffer = ReadableBuffers.wrap(directBuffer);
62-
return ReadableBuffers.openStream(readableBuffer, true);
63-
}
64-
6555
@Test
6656
public void testWrapGrpcBufferReturnsNullForRegularInputStream() throws IOException {
6757
byte[] testData = new byte[] {1, 2, 3, 4, 5};
@@ -73,9 +63,9 @@ public void testWrapGrpcBufferReturnsNullForRegularInputStream() throws IOExcept
7363
}
7464

7565
@Test
76-
public void testWrapGrpcBufferSucceedsForRealGrpcDirectBuffer() throws IOException {
66+
public void testWrapGrpcBufferSucceedsForDirectBuffer() throws IOException {
7767
byte[] testData = new byte[] {11, 22, 33, 44, 55};
78-
InputStream stream = createGrpcStreamWithDirectBuffer(testData);
68+
InputStream stream = MockGrpcInputStream.ofDirectBuffer(testData);
7969

8070
assertInstanceOf(Detachable.class, stream, "Real gRPC stream should implement Detachable");
8171
assertInstanceOf(
@@ -88,7 +78,7 @@ public void testWrapGrpcBufferSucceedsForRealGrpcDirectBuffer() throws IOExcepti
8878
"Should have direct ByteBuffer backing");
8979

9080
try (ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, testData.length)) {
91-
assertNotNull(result, "Should succeed for real gRPC stream with direct buffer");
81+
assertNotNull(result, "Should succeed for gRPC stream with direct buffer");
9282
assertEquals(testData.length, result.capacity());
9383

9484
// Check received data is the same
@@ -101,10 +91,7 @@ public void testWrapGrpcBufferSucceedsForRealGrpcDirectBuffer() throws IOExcepti
10191
@Test
10292
public void testWrapGrpcBufferReturnsNullForRealGrpcHeapByteBuffer() throws IOException {
10393
byte[] testData = new byte[] {1, 2, 3, 4, 5};
104-
ByteBuffer heapBuffer = ByteBuffer.wrap(testData);
105-
ReadableBuffer readableBuffer = ReadableBuffers.wrap(heapBuffer);
106-
107-
InputStream stream = ReadableBuffers.openStream(readableBuffer, true);
94+
InputStream stream = MockGrpcInputStream.ofHeapBuffer(testData);
10895

10996
assertInstanceOf(Detachable.class, stream, "Real gRPC stream should implement Detachable");
11097
assertInstanceOf(
@@ -117,14 +104,13 @@ public void testWrapGrpcBufferReturnsNullForRealGrpcHeapByteBuffer() throws IOEx
117104

118105
// Zero-copy should return null for heap buffer (not direct)
119106
ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, testData.length);
120-
assertNull(result, "Should return null for real gRPC stream with heap buffer");
107+
assertNull(result, "Should return null for gRPC stream with heap buffer");
121108
}
122109

123110
@Test
124-
public void testWrapGrpcBufferReturnsNullForRealGrpcByteArrayStream() throws IOException {
111+
public void testWrapGrpcBufferReturnsNullWhenByteBufferNotSupported() throws IOException {
125112
byte[] testData = new byte[] {1, 2, 3, 4, 5};
126-
ReadableBuffer readableBuffer = ReadableBuffers.wrap(testData);
127-
InputStream stream = ReadableBuffers.openStream(readableBuffer, true);
113+
InputStream stream = MockGrpcInputStream.withoutByteBufferSupport(testData);
128114

129115
// Verify the stream has the expected gRPC interfaces
130116
assertInstanceOf(Detachable.class, stream, "Real gRPC stream should implement Detachable");
@@ -137,51 +123,44 @@ public void testWrapGrpcBufferReturnsNullForRealGrpcByteArrayStream() throws IOE
137123

138124
// Zero-copy should return null when byteBufferSupported() is false
139125
ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, testData.length);
140-
assertNull(result, "Should return null for real gRPC stream backed by byte array");
126+
assertNull(result, "Should return null for gRPC stream without ByteBuffer support");
141127
}
142128

143129
@Test
144-
public void testWrapGrpcBufferMemoryAccountingWithRealGrpcStream() throws IOException {
130+
public void testWrapGrpcBufferMemoryAccounting() throws IOException {
145131
byte[] testData = new byte[1024];
146132
new Random(42).nextBytes(testData);
147-
InputStream stream = createGrpcStreamWithDirectBuffer(testData);
133+
InputStream stream = MockGrpcInputStream.ofDirectBuffer(testData);
148134

149-
long memoryBefore = allocator.getAllocatedMemory();
150-
assertEquals(0, memoryBefore);
135+
assertEquals(0, allocator.getAllocatedMemory());
151136

152137
ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, testData.length);
153-
assertNotNull(result, "Should succeed for real gRPC stream with direct buffer");
154-
155-
long memoryDuring = allocator.getAllocatedMemory();
156-
assertEquals(testData.length, memoryDuring);
138+
assertNotNull(result, "Should succeed for gRPC stream with direct buffer");
139+
assertEquals(testData.length, allocator.getAllocatedMemory());
157140

158141
byte[] readData = new byte[testData.length];
159142
result.getBytes(0, readData);
160143
assertArrayEquals(testData, readData);
161144

162145
result.close();
163-
164-
long memoryAfter = allocator.getAllocatedMemory();
165-
assertEquals(0, memoryAfter);
146+
assertEquals(0, allocator.getAllocatedMemory());
166147
}
167148

168149
@Test
169-
public void testWrapGrpcBufferReturnsNullForInsufficientDataWithRealGrpcStream()
170-
throws IOException {
150+
public void testWrapGrpcBufferReturnsNullForInsufficientData() throws IOException {
171151
byte[] testData = new byte[] {1, 2, 3};
172-
InputStream stream = createGrpcStreamWithDirectBuffer(testData);
152+
InputStream stream = MockGrpcInputStream.ofDirectBuffer(testData);
173153

174154
// Request more data than available
175155
ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, 10);
176156
assertNull(result, "Should return null when buffer has insufficient data");
177157
}
178158

179159
@Test
180-
public void testWrapGrpcBufferLargeDataWithRealGrpcStream() throws IOException {
181-
// Test with larger data (64KB)
160+
public void testWrapGrpcBufferLargeData() throws IOException {
182161
byte[] testData = new byte[64 * 1024];
183162
new Random(42).nextBytes(testData);
184-
InputStream stream = createGrpcStreamWithDirectBuffer(testData);
163+
InputStream stream = MockGrpcInputStream.ofDirectBuffer(testData);
185164

186165
try (ArrowBuf result = ArrowMessage.wrapGrpcBuffer(stream, allocator, testData.length)) {
187166
assertNotNull(result, "Should succeed for large data with real gRPC stream");
@@ -193,4 +172,57 @@ public void testWrapGrpcBufferLargeDataWithRealGrpcStream() throws IOException {
193172
assertArrayEquals(testData, readData);
194173
}
195174
}
175+
176+
/** Mock InputStream implementing gRPC's Detachable and HasByteBuffer for testing zero-copy. */
177+
private static class MockGrpcInputStream extends InputStream
178+
implements Detachable, HasByteBuffer {
179+
private ByteBuffer buffer;
180+
private final boolean byteBufferSupported;
181+
182+
private MockGrpcInputStream(ByteBuffer buffer, boolean byteBufferSupported) {
183+
this.buffer = buffer;
184+
this.byteBufferSupported = byteBufferSupported;
185+
}
186+
187+
static MockGrpcInputStream ofDirectBuffer(byte[] data) {
188+
ByteBuffer buf = ByteBuffer.allocateDirect(data.length);
189+
buf.put(data).flip();
190+
return new MockGrpcInputStream(buf, true);
191+
}
192+
193+
static MockGrpcInputStream ofHeapBuffer(byte[] data) {
194+
return new MockGrpcInputStream(ByteBuffer.wrap(data), true);
195+
}
196+
197+
static MockGrpcInputStream withoutByteBufferSupport(byte[] data) {
198+
return new MockGrpcInputStream(ByteBuffer.wrap(data), false);
199+
}
200+
201+
@Override
202+
public boolean byteBufferSupported() {
203+
return byteBufferSupported;
204+
}
205+
206+
@Override
207+
public ByteBuffer getByteBuffer() {
208+
return byteBufferSupported ? buffer : null;
209+
}
210+
211+
@Override
212+
public InputStream detach() {
213+
ByteBuffer detached = this.buffer;
214+
this.buffer = null;
215+
return new MockGrpcInputStream(detached, byteBufferSupported);
216+
}
217+
218+
@Override
219+
public int read() {
220+
return (buffer != null && buffer.hasRemaining()) ? (buffer.get() & 0xFF) : -1;
221+
}
222+
223+
@Override
224+
public void close() {
225+
buffer = null;
226+
}
227+
}
196228
}

0 commit comments

Comments
 (0)