Skip to content

Commit 7cde3c9

Browse files
authored
Pipe: Optimized the procedure waiting timeout for some time-consuming procedure & Construct the historical pipe when realtime pipe creation times out (#17404)
* fix * add
1 parent 67b017b commit 7cde3c9

2 files changed

Lines changed: 21 additions & 11 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public TSStatus deleteTimeSeries(
388388
this.executor.submitProcedure(procedure);
389389
}
390390
}
391-
return waitingProcedureFinished(procedure);
391+
return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
392392
}
393393

394394
public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
@@ -1515,16 +1515,16 @@ public void createConsensusPipeAsync(TCreatePipeReq req) {
15151515

15161516
public TSStatus createPipe(TCreatePipeReq req) {
15171517
try {
1518-
CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
1518+
final CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
15191519
executor.submitProcedure(procedure);
1520-
TSStatus status = waitingProcedureFinished(procedure);
1520+
final TSStatus status = waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
15211521
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
15221522
return status;
15231523
} else {
15241524
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
15251525
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
15261526
}
1527-
} catch (Exception e) {
1527+
} catch (final Exception e) {
15281528
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
15291529
}
15301530
}
@@ -1533,14 +1533,14 @@ public TSStatus alterPipe(final TAlterPipeReq req) {
15331533
try {
15341534
final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
15351535
executor.submitProcedure(procedure);
1536-
TSStatus status = waitingProcedureFinished(procedure);
1536+
final TSStatus status = waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
15371537
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
15381538
return status;
15391539
} else {
15401540
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
15411541
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
15421542
}
1543-
} catch (Exception e) {
1543+
} catch (final Exception e) {
15441544
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
15451545
}
15461546
}
@@ -1624,9 +1624,9 @@ public void dropConsensusPipeAsync(String pipeName) {
16241624

16251625
public TSStatus dropPipe(String pipeName) {
16261626
try {
1627-
DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
1627+
final DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
16281628
executor.submitProcedure(procedure);
1629-
TSStatus status = waitingProcedureFinished(procedure);
1629+
final TSStatus status = waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
16301630
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
16311631
return status;
16321632
} else {
@@ -1881,13 +1881,18 @@ private TSStatus waitingProcedureFinished(final long procedureId) {
18811881
return waitingProcedureFinished(executor.getProcedures().get(procedureId));
18821882
}
18831883

1884+
protected TSStatus waitingProcedureFinished(final Procedure<?> procedure) {
1885+
return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT);
1886+
}
1887+
18841888
/**
18851889
* Waiting until the specific procedure finished.
18861890
*
18871891
* @param procedure The specific procedure
18881892
* @return TSStatus the running result of this procedure
18891893
*/
1890-
protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
1894+
protected TSStatus waitingProcedureFinished(
1895+
Procedure<?> procedure, final long procedureWaitRetryTimeout) {
18911896
if (procedure == null) {
18921897
LOGGER.error("Unexpected null procedure parameters for waitingProcedureFinished");
18931898
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
@@ -1896,7 +1901,7 @@ protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
18961901
final long startTimeForCurrentProcedure = System.currentTimeMillis();
18971902
while (executor.isRunning()
18981903
&& !executor.isFinished(procedure.getProcId())
1899-
&& System.currentTimeMillis() - startTimeForCurrentProcedure < PROCEDURE_WAIT_TIME_OUT) {
1904+
&& System.currentTimeMillis() - startTimeForCurrentProcedure < procedureWaitRetryTimeout) {
19001905
sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
19011906
}
19021907
if (!procedure.isFinished()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2232,7 +2232,12 @@ public SettableFuture<ConfigTaskResult> createPipe(
22322232

22332233
final TSStatus realtimeTsStatus = configNodeClient.createPipe(realtimeReq);
22342234
// If creation fails, immediately return with exception
2235-
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != realtimeTsStatus.getCode()) {
2235+
// If the procedure is still running, it's probably stuck on DataNode
2236+
// The pipe creation can ignore this situation and succeed, thus we do not need to skip in
2237+
// this case
2238+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != realtimeTsStatus.getCode()
2239+
&& TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode()
2240+
!= realtimeTsStatus.getCode()) {
22362241
future.setException(new IoTDBException(realtimeTsStatus));
22372242
return future;
22382243
}

0 commit comments

Comments
 (0)