|
27 | 27 | import java.time.Duration; |
28 | 28 | import java.util.HashMap; |
29 | 29 | import java.util.Map; |
| 30 | +import java.util.concurrent.CountDownLatch; |
30 | 31 | import java.util.concurrent.Executor; |
31 | 32 | import java.util.concurrent.Executors; |
32 | 33 | import java.util.concurrent.TimeUnit; |
|
39 | 40 | import org.apache.arrow.flight.CallOption; |
40 | 41 | import org.apache.arrow.flight.CallOptions; |
41 | 42 | import org.apache.arrow.flight.CallStatus; |
| 43 | +import org.apache.arrow.flight.FlightProducer.CallContext; |
| 44 | +import org.apache.arrow.flight.FlightProducer.ServerStreamListener; |
42 | 45 | import org.apache.arrow.flight.FlightRuntimeException; |
43 | 46 | import org.apache.arrow.flight.FlightServer; |
| 47 | +import org.apache.arrow.flight.NoOpFlightProducer; |
| 48 | +import org.apache.arrow.flight.Ticket; |
44 | 49 | import org.apache.arrow.flight.impl.FlightServiceGrpc; |
45 | 50 | import org.apache.arrow.memory.BufferAllocator; |
46 | 51 | import org.apache.arrow.memory.RootAllocator; |
47 | 52 | import org.apache.arrow.vector.VectorSchemaRoot; |
48 | 53 | import org.assertj.core.api.Assertions; |
49 | 54 | import org.junit.jupiter.api.BeforeEach; |
50 | 55 | import org.junit.jupiter.api.Test; |
| 56 | +import org.junit.jupiter.api.Timeout; |
51 | 57 |
|
52 | 58 | import com.influxdb.v3.client.InfluxDBClient; |
53 | 59 | import com.influxdb.v3.client.PointValues; |
@@ -186,6 +192,62 @@ void setInboundMessageSizeLarge() throws Exception { |
186 | 192 | } |
187 | 193 | } |
188 | 194 |
|
| 195 | + @Test |
| 196 | + @Timeout(5) |
| 197 | + void queryTimeout() throws Exception { |
| 198 | + int freePort = findFreePort(); |
| 199 | + URI uri = URI.create("http://127.0.0.1:" + freePort); |
| 200 | + CountDownLatch serverStreamFinished = new CountDownLatch(1); |
| 201 | + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1); |
| 202 | + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); |
| 203 | + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() { |
| 204 | + @Override |
| 205 | + public void getStream(final CallContext context, |
| 206 | + final Ticket ticket, |
| 207 | + final ServerStreamListener listener) { |
| 208 | + listener.start(vectorSchemaRoot); |
| 209 | + try { |
| 210 | + Thread.sleep(1000); |
| 211 | + if (!context.isCancelled() && !listener.isCancelled()) { |
| 212 | + listener.completed(); |
| 213 | + } |
| 214 | + } catch (InterruptedException e) { |
| 215 | + Thread.currentThread().interrupt(); |
| 216 | + } finally { |
| 217 | + serverStreamFinished.countDown(); |
| 218 | + } |
| 219 | + } |
| 220 | + }) |
| 221 | + ) { |
| 222 | + flightServer.start(); |
| 223 | + |
| 224 | + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); |
| 225 | + ClientConfig clientConfig = new ClientConfig.Builder() |
| 226 | + .host(host) |
| 227 | + .database("test") |
| 228 | + .queryTimeout(Duration.ofMillis(200)) |
| 229 | + .build(); |
| 230 | + |
| 231 | + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { |
| 232 | + Throwable thrown = Assertions.catchThrowable(() -> { |
| 233 | + try (Stream<PointValues> stream = influxDBClient.queryPoints( |
| 234 | + "Select * from \"nothing\"" |
| 235 | + )) { |
| 236 | + stream.count(); |
| 237 | + } |
| 238 | + }); |
| 239 | + |
| 240 | + Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); |
| 241 | + FlightRuntimeException fre = (FlightRuntimeException) thrown; |
| 242 | + Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code()); |
| 243 | + } |
| 244 | + |
| 245 | + Assertions.assertThat(serverStreamFinished.await(2, TimeUnit.SECONDS)).isTrue(); |
| 246 | + flightServer.shutdown(); |
| 247 | + flightServer.awaitTermination(2, TimeUnit.SECONDS); |
| 248 | + } |
| 249 | + } |
| 250 | + |
189 | 251 | @Test |
190 | 252 | void defaultGrpcCallOptions() { |
191 | 253 | GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions(); |
|
0 commit comments