Skip to content

Commit cfce7d9

Browse files
authored
fix: prevent NPE when isFinished() is called before DataDriver init (#17440)
1 parent 9c9eae1 commit cfce7d9

4 files changed

Lines changed: 131 additions & 1 deletion

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
7474
return true;
7575
}
7676

77+
@Override
78+
public boolean isInit() {
79+
return init;
80+
}
81+
7782
/**
7883
* Init seq file list and unseq file list in {@link
7984
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private boolean isFinishedInternal() {
214214
finished =
215215
state.get() != State.ALIVE
216216
|| driverContext.isDone()
217-
|| root.isFinished()
217+
|| (isInit() && root.isFinished())
218218
|| sink.isClosed();
219219
} catch (Exception e) {
220220
throw new RuntimeException(e);
@@ -225,6 +225,8 @@ private boolean isFinishedInternal() {
225225
return finished;
226226
}
227227

228+
abstract boolean isInit();
229+
228230
@SuppressWarnings({"squid:S1181", "squid:S112"})
229231
private ListenableFuture<?> processInternal() {
230232
long startTimeNanos = System.nanoTime();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
4242
return true;
4343
}
4444

45+
@Override
46+
boolean isInit() {
47+
return true;
48+
}
49+
4550
@Override
4651
protected void releaseResource() {
4752
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
3636
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
3737
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
38+
import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
3839
import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
40+
import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
3941
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
4042
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
4143
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -254,4 +256,120 @@ public void batchTest() {
254256
instanceNotificationExecutor.shutdown();
255257
}
256258
}
259+
260+
@Test
261+
public void testCallIsFinishedBeforeDataSourcePrepared() {
262+
ExecutorService instanceNotificationExecutor =
263+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
264+
try {
265+
IFullPath measurementPath1 =
266+
new NonAlignedFullPath(
267+
IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"),
268+
new MeasurementSchema("sensor0", TSDataType.INT32));
269+
Set<String> allSensors = new HashSet<>();
270+
allSensors.add("sensor0");
271+
allSensors.add("sensor1");
272+
QueryId queryId = new QueryId("stub_query");
273+
FragmentInstanceId instanceId =
274+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
275+
FragmentInstanceStateMachine stateMachine =
276+
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
277+
DataRegion dataRegion = Mockito.mock(DataRegion.class);
278+
Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
279+
FragmentInstanceContext fragmentInstanceContext =
280+
createFragmentInstanceContext(instanceId, stateMachine);
281+
fragmentInstanceContext.setDataRegion(dataRegion);
282+
DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0);
283+
PlanNodeId planNodeId1 = new PlanNodeId("1");
284+
driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
285+
PlanNodeId planNodeId2 = new PlanNodeId("2");
286+
driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
287+
driverContext.addOperatorContext(
288+
3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName());
289+
driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());
290+
291+
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
292+
scanOptionsBuilder.withAllSensors(allSensors);
293+
SeriesScanOperator seriesScanOperator1 =
294+
new SeriesScanOperator(
295+
driverContext.getOperatorContexts().get(0),
296+
planNodeId1,
297+
measurementPath1,
298+
Ordering.ASC,
299+
scanOptionsBuilder.build());
300+
driverContext.addSourceOperator(seriesScanOperator1);
301+
driverContext.addPath(measurementPath1);
302+
seriesScanOperator1
303+
.getOperatorContext()
304+
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
305+
306+
IFullPath measurementPath2 =
307+
new NonAlignedFullPath(
308+
IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"),
309+
new MeasurementSchema("sensor1", TSDataType.INT32));
310+
SeriesScanOperator seriesScanOperator2 =
311+
new SeriesScanOperator(
312+
driverContext.getOperatorContexts().get(1),
313+
planNodeId2,
314+
measurementPath2,
315+
Ordering.ASC,
316+
scanOptionsBuilder.build());
317+
driverContext.addSourceOperator(seriesScanOperator2);
318+
driverContext.addPath(measurementPath2);
319+
320+
seriesScanOperator2
321+
.getOperatorContext()
322+
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
323+
324+
LeftOuterTimeJoinOperator timeJoinOperator =
325+
new LeftOuterTimeJoinOperator(
326+
driverContext.getOperatorContexts().get(2),
327+
seriesScanOperator1,
328+
1,
329+
seriesScanOperator2,
330+
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
331+
new AscTimeComparator());
332+
SingleDeviceViewOperator fakeOperator =
333+
new SingleDeviceViewOperator(
334+
driverContext.getOperatorContexts().get(3),
335+
"d1",
336+
timeJoinOperator,
337+
Arrays.asList(0),
338+
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
339+
fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
340+
341+
fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
342+
String deviceId = DATA_DRIVER_TEST_SG + ".device0";
343+
Mockito.when(
344+
dataRegion.query(
345+
eq(driverContext.getPaths()),
346+
eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)),
347+
eq(fragmentInstanceContext),
348+
Mockito.isNull(),
349+
Mockito.isNull(),
350+
Mockito.anyLong()))
351+
.thenReturn(null);
352+
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
353+
fragmentInstanceContext.initializeNumOfDrivers(1);
354+
355+
StubSink stubSink = new StubSink(fragmentInstanceContext);
356+
driverContext.setSink(stubSink);
357+
IDriver dataDriver = null;
358+
try {
359+
dataDriver = new DataDriver(fakeOperator, driverContext, 0);
360+
assertEquals(
361+
fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId());
362+
assertFalse(dataDriver.isFinished());
363+
} finally {
364+
if (dataDriver != null) {
365+
dataDriver.close();
366+
}
367+
}
368+
} catch (QueryProcessException e) {
369+
e.printStackTrace();
370+
fail();
371+
} finally {
372+
instanceNotificationExecutor.shutdown();
373+
}
374+
}
257375
}

0 commit comments

Comments
 (0)