Skip to content

Commit 35ef0c8

Browse files
authored
Fix deadlock between DataNode createDataRegion and ConfigNode PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode (#17233)
* Fix deadlock between DataNode createDataRegion and ConfigNode PipeTaskCoordinatorLock by delegating consensus pipe lifecycle to ConfigNode * CN get IoTV2 replication mode * remove duplicated code * spotless * fix not create consensus pipe when initially create data region * Async create pipe to avoid audit log dead lock * fix PipeTaskCoordinator can not be shared cross thread * fix DN CN restart at the same time with data race and DN where region was removed failed to clear region file * refine code * remove useless comment * unlock for PipeTaskCoordinator no longer return
1 parent 6ec3a51 commit 35ef0c8

22 files changed

Lines changed: 426 additions & 645 deletions

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public class ConfigNodeConfig {
7777
/** Data region consensus protocol. */
7878
private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS;
7979

80+
/** IoTConsensusV2 replicate mode: "batch" or "stream". */
81+
private String iotConsensusV2Mode = "batch";
82+
8083
/** Default number of DataRegion replicas. */
8184
private int dataReplicationFactor = 1;
8285

@@ -530,6 +533,14 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc
530533
this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
531534
}
532535

536+
public String getIotConsensusV2Mode() {
537+
return iotConsensusV2Mode;
538+
}
539+
540+
public void setIotConsensusV2Mode(String iotConsensusV2Mode) {
541+
this.iotConsensusV2Mode = iotConsensusV2Mode;
542+
}
543+
533544
public int getDataRegionPerDataNode() {
534545
return dataRegionPerDataNode;
535546
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
205205
properties.getProperty(
206206
"data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()));
207207

208+
conf.setIotConsensusV2Mode(
209+
properties.getProperty("iot_consensus_v2_mode", conf.getIotConsensusV2Mode()));
210+
208211
conf.setDataReplicationFactor(
209212
Integer.parseInt(
210213
properties.getProperty(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,6 +1485,23 @@ public TSStatus createConsensusPipe(TCreatePipeReq req) {
14851485
}
14861486
}
14871487

1488+
/**
1489+
* Submit a consensus pipe creation procedure without blocking. The procedure will execute in the
1490+
* background. Failures are logged and can be repaired by the consensus pipe guardian.
1491+
*/
1492+
public void createConsensusPipeAsync(TCreatePipeReq req) {
1493+
try {
1494+
CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
1495+
executor.submitProcedure(procedure);
1496+
LOGGER.info("Submitted async consensus pipe creation: {}", req.getPipeName());
1497+
} catch (Exception e) {
1498+
LOGGER.warn(
1499+
"Failed to submit async consensus pipe creation for {}: {}",
1500+
req.getPipeName(),
1501+
e.getMessage());
1502+
}
1503+
}
1504+
14881505
public TSStatus createPipe(TCreatePipeReq req) {
14891506
try {
14901507
CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
@@ -1579,6 +1596,21 @@ public TSStatus dropConsensusPipe(String pipeName) {
15791596
}
15801597
}
15811598

1599+
/**
1600+
* Submit a consensus pipe drop procedure without blocking. The procedure will execute in the
1601+
* background. Failures are logged and can be repaired by the consensus pipe guardian.
1602+
*/
1603+
public void dropConsensusPipeAsync(String pipeName) {
1604+
try {
1605+
DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
1606+
executor.submitProcedure(procedure);
1607+
LOGGER.info("Submitted async consensus pipe drop: {}", pipeName);
1608+
} catch (Exception e) {
1609+
LOGGER.warn(
1610+
"Failed to submit async consensus pipe drop for {}: {}", pipeName, e.getMessage());
1611+
}
1612+
}
1613+
15821614
public TSStatus dropPipe(String pipeName) {
15831615
try {
15841616
DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,9 @@ public AtomicReference<PipeTaskInfo> lock() {
8585
/**
8686
* Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
8787
* which means that the holder will be null after calling this method.
88-
*
89-
* @return {@code true} if successfully unlocked, {@code false} if current thread is not holding
90-
* the lock.
9188
*/
92-
public boolean unlock() {
93-
try {
94-
pipeTaskCoordinatorLock.unlock();
95-
return true;
96-
} catch (IllegalMonitorStateException ignored) {
97-
// This is thrown if unlock() is called without lock() called first.
98-
LOGGER.warn("This thread is not holding the lock.");
99-
return false;
100-
}
89+
public void unlock() {
90+
pipeTaskCoordinatorLock.unlock();
10191
}
10292

10393
public boolean isLocked() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,29 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25+
import java.util.concurrent.Semaphore;
2526
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.locks.ReentrantLock;
2727

2828
/**
29-
* {@link PipeTaskCoordinatorLock} is a cross thread lock for pipe task coordinator. It is used to
29+
* {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to
3030
* ensure that only one thread can execute the pipe task coordinator at the same time.
31+
*
32+
* <p>Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support
33+
* cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be
34+
* acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a
35+
* ProcedureCoreWorker thread after execution.
3136
*/
3237
public class PipeTaskCoordinatorLock {
3338

3439
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
3540

36-
private final ReentrantLock lock = new ReentrantLock();
41+
private final Semaphore semaphore = new Semaphore(1);
3742

3843
public void lock() {
3944
LOGGER.debug(
4045
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
4146
try {
42-
lock.lockInterruptibly();
47+
semaphore.acquire();
4348
LOGGER.debug(
4449
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
4550
} catch (final InterruptedException e) {
@@ -54,7 +59,7 @@ public boolean tryLock() {
5459
try {
5560
LOGGER.debug(
5661
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
57-
if (lock.tryLock(10, TimeUnit.SECONDS)) {
62+
if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
5863
LOGGER.debug(
5964
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
6065
return true;
@@ -74,12 +79,12 @@ public boolean tryLock() {
7479
}
7580

7681
public void unlock() {
77-
lock.unlock();
82+
semaphore.release();
7883
LOGGER.debug(
7984
"PipeTaskCoordinator lock released by thread {}", Thread.currentThread().getName());
8085
}
8186

8287
public boolean isLocked() {
83-
return lock.isLocked();
88+
return semaphore.availablePermits() == 0;
8489
}
8590
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,8 @@ public boolean unlock() {
105105
subscriptionInfoHolder = null;
106106
}
107107

108-
try {
109-
coordinatorLock.unlock();
110-
return true;
111-
} catch (IllegalMonitorStateException ignored) {
112-
// This is thrown if unlock() is called without lock() called first.
113-
LOGGER.warn("This thread is not holding the lock.");
114-
return false;
115-
}
108+
coordinatorLock.unlock();
109+
return true;
116110
}
117111

118112
public boolean isLocked() {

0 commit comments

Comments
 (0)