Skip to content

Commit bb68d50

Browse files
authored
Fix kill query doesn't take effect bug (#17358)
1 parent 5095862 commit bb68d50

10 files changed

Lines changed: 206 additions & 82 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@
245245
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
246246
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
247247
import static org.apache.iotdb.rpc.RpcUtils.TIME_PRECISION;
248+
import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
248249

249250
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
250251

@@ -286,6 +287,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
286287
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
287288
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
288289

290+
private static final String NO_QUERY_EXECUTION_ERR_MSG =
291+
"Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log.";
292+
289293
@FunctionalInterface
290294
public interface SelectResult {
291295

@@ -1526,15 +1530,18 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
15261530
finished = true;
15271531
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
15281532
}
1529-
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
15301533

15311534
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
15321535

15331536
if (queryExecution == null) {
1534-
resp.setHasResultSet(false);
1535-
resp.setMoreData(false);
1536-
return resp;
1537+
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
1538+
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
1539+
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
15371540
}
1541+
1542+
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
1543+
1544+
queryExecution.updateCurrentRpcStartTime(startTime);
15381545
statementType = queryExecution.getStatementType();
15391546

15401547
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
@@ -2272,16 +2279,16 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
22722279
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
22732280
}
22742281

2275-
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
2276-
22772282
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
22782283
if (queryExecution == null) {
2279-
resp.setHasResultSet(false);
2280-
resp.setMoreData(true);
2281-
return resp;
2284+
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
2285+
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
2286+
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
22822287
}
2288+
queryExecution.updateCurrentRpcStartTime(startTime);
22832289
statementType = queryExecution.getStatementType();
22842290

2291+
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
22852292
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
22862293
Pair<TSQueryDataSet, Boolean> pair =
22872294
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
@@ -2291,7 +2298,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
22912298
resp.setHasResultSet(hasResultSet);
22922299
resp.setQueryDataSet(result);
22932300
resp.setIsAlign(true);
2294-
resp.setMoreData(finished);
2301+
resp.setMoreData(!finished);
22952302
return resp;
22962303
}
22972304
} catch (Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ public enum ExplainType {
7676
private long localQueryId;
7777
private SessionInfo session;
7878
private QueryType queryType = QueryType.READ;
79+
80+
/** the max executing time of query in ms. Unit: millisecond */
7981
private long timeOut;
82+
83+
// time unit is ms
8084
private long startTime;
8185

8286
private TEndPoint localDataBlockEndpoint;
@@ -147,6 +151,7 @@ public enum ExplainType {
147151
// Tables in the subquery
148152
private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new HashMap<>();
149153

154+
@TestOnly
150155
public MPPQueryContext(QueryId queryId) {
151156
this.queryId = queryId;
152157
this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -161,12 +166,7 @@ public MPPQueryContext(
161166
SessionInfo session,
162167
TEndPoint localDataBlockEndpoint,
163168
TEndPoint localInternalEndpoint) {
164-
this(queryId);
165-
this.sql = sql;
166-
this.session = session;
167-
this.localDataBlockEndpoint = localDataBlockEndpoint;
168-
this.localInternalEndpoint = localInternalEndpoint;
169-
this.initResultNodeContext();
169+
this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint);
170170
}
171171

172172
public MPPQueryContext(
@@ -244,10 +244,12 @@ public QueryType getQueryType() {
244244
return queryType;
245245
}
246246

247+
/** the max executing time of query in ms. Unit: millisecond */
247248
public long getTimeOut() {
248249
return timeOut;
249250
}
250251

252+
/** the max executing time of query in ms. Unit: millisecond */
251253
public void setTimeOut(long timeOut) {
252254
this.timeOut = timeOut;
253255
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ public void transitionToCanceled() {
107107
transitionToDoneState(CANCELED);
108108
}
109109

110-
public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
110+
public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
111111
this.failureStatus.compareAndSet(null, failureStatus);
112112
this.failureException.compareAndSet(null, throwable);
113-
transitionToDoneState(CANCELED);
113+
return transitionToDoneState(CANCELED);
114114
}
115115

116116
public void transitionToAborted() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3939
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
4040
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
41+
import org.apache.iotdb.db.queryengine.plan.Coordinator;
4142
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
4243
import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
4344
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -435,6 +436,7 @@ private void cancelTimeoutFlushingInstances() {
435436
+ "ms, and now is in flushing state"));
436437
}
437438
});
439+
Coordinator.getInstance().cleanUpStaleQueries();
438440
}
439441

440442
public ExecutorService getIntoOperationExecutor() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iotdb.db.auth.AuthorityChecker;
3636
import org.apache.iotdb.db.conf.IoTDBConfig;
3737
import org.apache.iotdb.db.conf.IoTDBDescriptor;
38+
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
3839
import org.apache.iotdb.db.exception.sql.SemanticException;
3940
import org.apache.iotdb.db.protocol.session.IClientSession;
4041
import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
@@ -805,15 +806,15 @@ public QueryId createQueryId() {
805806
}
806807

807808
public void cleanupQueryExecution(Long queryId, Supplier<String> contentSupplier, Throwable t) {
808-
IQueryExecution queryExecution = getQueryExecution(queryId);
809+
IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
809810
if (queryExecution != null) {
810811
cleanupQueryExecutionInternal(queryId, queryExecution, contentSupplier, t);
811812
}
812813
}
813814

814815
public void cleanupQueryExecution(
815816
Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable t) {
816-
IQueryExecution queryExecution = getQueryExecution(queryId);
817+
IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
817818
if (queryExecution != null) {
818819
Supplier<String> contentSupplier =
819820
new ContentOfQuerySupplier(nativeApiRequest, queryExecution);
@@ -898,6 +899,35 @@ public static void recordQueries(
898899
}
899900
}
900901

902+
/**
903+
* We need to reclaim resources from queries that have exceeded their timeout by more than one
904+
* minute. This indicates that the associated clients have failed to perform proper resource
905+
* cleanup.
906+
*/
907+
public void cleanUpStaleQueries() {
908+
long currentTime = System.currentTimeMillis();
909+
queryExecutionMap.forEach(
910+
(queryId, queryExecution) -> {
911+
if (queryExecution.isActive()) {
912+
return;
913+
}
914+
long timeout = queryExecution.getTimeout();
915+
long queryStartTime = queryExecution.getStartExecutionTime();
916+
long executeTime = currentTime - queryStartTime;
917+
if (timeout > 0 && executeTime - 60_000L > timeout) {
918+
LOGGER.warn(
919+
"Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms",
920+
queryId,
921+
executeTime,
922+
timeout);
923+
cleanupQueryExecution(
924+
queryId,
925+
(org.apache.thrift.TBase<?, ?>) null,
926+
new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout));
927+
}
928+
});
929+
}
930+
901931
public void cleanupQueryExecution(Long queryId) {
902932
cleanupQueryExecution(queryId, (org.apache.thrift.TBase<?, ?>) null, null);
903933
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iotdb.db.queryengine.plan.Coordinator;
3737
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
3838
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
39+
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
3940
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
4041
import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
4142
import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
@@ -265,30 +266,38 @@ private ClusterSchemaTree executeSchemaFetchQuery(
265266
String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()),
266267
executionResult.status.getCode());
267268
}
269+
IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
268270
try (SetThreadName ignored = new SetThreadName(executionResult.queryId.getId())) {
269271
ClusterSchemaTree result = new ClusterSchemaTree();
270272
ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
271273
new ClusterSchemaTree.SchemaNodeBatchDeserializer();
272274
Set<String> databaseSet = new HashSet<>();
273-
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
274-
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
275-
// So we don't need to clean up it manually
276-
Optional<TsBlock> tsBlock;
277-
try {
278-
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
279-
} catch (IoTDBException e) {
280-
t = e;
281-
throw new QuerySchemaFetchFailedException(
282-
String.format("Fetch Schema failed: %s", e.getMessage()), e.getErrorCode());
283-
}
284-
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
285-
break;
286-
}
287-
Column column = tsBlock.get().getColumn(0);
288-
for (int i = 0; i < column.getPositionCount(); i++) {
289-
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
275+
if (queryExecution != null) {
276+
while (queryExecution.hasNextResult()) {
277+
// The query will be transited to FINISHED when invoking getBatchResult() at the last
278+
// time
279+
// So we don't need to clean up it manually
280+
Optional<TsBlock> tsBlock;
281+
try {
282+
tsBlock = queryExecution.getBatchResult();
283+
} catch (IoTDBException e) {
284+
t = e;
285+
throw new QuerySchemaFetchFailedException(
286+
String.format("Fetch Schema failed: %s", e.getMessage()), e.getErrorCode());
287+
}
288+
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
289+
break;
290+
}
291+
Column column = tsBlock.get().getColumn(0);
292+
for (int i = 0; i < column.getPositionCount(); i++) {
293+
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
294+
}
290295
}
296+
} else {
297+
throw new RuntimeException(
298+
String.format("Fetch Schema failed, because queryExecution is null for %s", queryId));
291299
}
300+
292301
result.setDatabases(databaseSet);
293302
return result;
294303
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ public interface IQueryExecution {
3535

3636
void stop(Throwable t);
3737

38-
void stopAndCleanup();
39-
4038
void stopAndCleanup(Throwable t);
4139

4240
void cancel();
@@ -61,15 +59,38 @@ public interface IQueryExecution {
6159

6260
String getQueryId();
6361

62+
// time unit is ms
6463
long getStartExecutionTime();
6564

65+
/**
66+
* @param executionTime time unit should be ns
67+
*/
6668
void recordExecutionTime(long executionTime);
6769

70+
/**
71+
* update current rpc start time, which is used to calculate rpc execution time and update total
72+
* execution time
73+
*
74+
* @param startTime start time of current rpc, time unit is ns
75+
*/
76+
void updateCurrentRpcStartTime(long startTime);
77+
78+
/**
79+
* Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it
80+
* means there is no active RPC, otherwise there is an active RPC. An active RPC means that the
81+
* client is still fetching results and the QueryExecution should not be cleaned up until the RPC
82+
* finishes. On the other hand, if there is no active RPC, it means that the client has finished
83+
* fetching results or has not started fetching results yet, and the QueryExecution can be safely
84+
* cleaned up.
85+
*/
86+
boolean isActive();
87+
6888
/**
6989
* @return cost time in ns
7090
*/
7191
long getTotalExecutionTime();
7292

93+
/** the max executing time of query in ms. Unit: millisecond */
7394
long getTimeout();
7495

7596
Optional<String> getExecuteSQL();

0 commit comments

Comments
 (0)