Skip to content

Commit fad2869

Browse files
authored
Fix recovered consensus pipes staying stopped after snapshot load (#17438)
* Fix recovered consensus pipes staying stopped after snapshot load * Apply spotless formatting for consensus pipe recovery fix
1 parent cfce7d9 commit fad2869

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,11 +957,36 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
957957
try (final FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
958958
pipeMetaKeeper.processLoadSnapshot(fileInputStream);
959959
}
960+
normalizeRecoveredConsensusPipeStatus();
960961
} finally {
961962
releaseWriteLock();
962963
}
963964
}
964965

966+
private void normalizeRecoveredConsensusPipeStatus() {
967+
final List<String> restartedConsensusPipes = new ArrayList<>();
968+
969+
pipeMetaKeeper
970+
.getPipeMetaList()
971+
.forEach(
972+
pipeMeta -> {
973+
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
974+
if (!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())
975+
|| !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
976+
|| runtimeMeta.getIsStoppedByRuntimeException()) {
977+
return;
978+
}
979+
980+
runtimeMeta.getStatus().set(PipeStatus.RUNNING);
981+
restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName());
982+
});
983+
984+
if (!restartedConsensusPipes.isEmpty()) {
985+
LOGGER.info(
986+
"Recovered consensus pipes {} as RUNNING during snapshot load.", restartedConsensusPipes);
987+
}
988+
}
989+
965990
/////////////////////////////// hashCode & equals ///////////////////////////////
966991

967992
@Override

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.junit.Before;
3434
import org.junit.Test;
3535

36+
import java.io.File;
3637
import java.util.HashMap;
3738
import java.util.Map;
3839
import java.util.concurrent.ConcurrentHashMap;
@@ -149,4 +150,60 @@ public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() {
149150
Assert.assertTrue(result.containsKey(consensusPipeName));
150151
Assert.assertFalse(result.containsKey(subscriptionPipeName));
151152
}
153+
154+
@Test
155+
public void testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws Exception {
156+
DataRegionId regionId = new DataRegionId(100);
157+
String consensusPipeToRestart = new ConsensusPipeName(regionId, 1, 2).toString();
158+
String consensusPipeStoppedByException = new ConsensusPipeName(regionId, 2, 1).toString();
159+
String userPipeName = "userPipe";
160+
161+
createPipe(consensusPipeToRestart, PipeStatus.STOPPED);
162+
createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED);
163+
createPipe(userPipeName, PipeStatus.STOPPED);
164+
165+
pipeTaskInfo
166+
.getPipeMetaByPipeName(consensusPipeStoppedByException)
167+
.getRuntimeMeta()
168+
.setIsStoppedByRuntimeException(true);
169+
170+
final File snapshotDir =
171+
java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile();
172+
try {
173+
Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir));
174+
175+
PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo();
176+
recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir);
177+
178+
Assert.assertEquals(
179+
PipeStatus.RUNNING,
180+
recoveredPipeTaskInfo
181+
.getPipeMetaByPipeName(consensusPipeToRestart)
182+
.getRuntimeMeta()
183+
.getStatus()
184+
.get());
185+
Assert.assertEquals(
186+
PipeStatus.STOPPED,
187+
recoveredPipeTaskInfo
188+
.getPipeMetaByPipeName(consensusPipeStoppedByException)
189+
.getRuntimeMeta()
190+
.getStatus()
191+
.get());
192+
Assert.assertTrue(
193+
recoveredPipeTaskInfo
194+
.getPipeMetaByPipeName(consensusPipeStoppedByException)
195+
.getRuntimeMeta()
196+
.getIsStoppedByRuntimeException());
197+
Assert.assertEquals(
198+
PipeStatus.STOPPED,
199+
recoveredPipeTaskInfo
200+
.getPipeMetaByPipeName(userPipeName)
201+
.getRuntimeMeta()
202+
.getStatus()
203+
.get());
204+
} finally {
205+
new File(snapshotDir, "pipe_task_info.bin").delete();
206+
snapshotDir.delete();
207+
}
208+
}
152209
}

0 commit comments

Comments
 (0)