Skip to content

Commit 9fe80a4

Browse files
authored
Fix pipe consensus compatibility during rolling upgrade (#17428)
* Fix pipe consensus compatibility during rolling upgrade * Apply spotless formatting * Fix pipe consensus upgrade compatibility
1 parent 55082b2 commit 9fe80a4

7 files changed

Lines changed: 122 additions & 9 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.exception.BadNodeUrlException;
2828
import org.apache.iotdb.commons.file.SystemPropertiesHandler;
2929
import org.apache.iotdb.commons.utils.NodeUrlUtils;
30+
import org.apache.iotdb.consensus.ConsensusFactory;
3031

3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
@@ -124,10 +125,15 @@ public static void checkSystemProperties() throws IOException {
124125
}
125126
}
126127

128+
// Only the data region protocol could have been persisted as the old PipeConsensus name
129+
// during a jar-only upgrade, so only that field needs compatibility normalization.
127130
// Consensus protocol configuration
131+
boolean needRewriteConsensusProtocol = false;
132+
128133
String configNodeConsensusProtocolClass =
129134
systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null);
130-
if (!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass())) {
135+
if (!Objects.equals(
136+
configNodeConsensusProtocolClass, conf.getConfigNodeConsensusProtocolClass())) {
131137
LOGGER.warn(
132138
format,
133139
CN_CONSENSUS_PROTOCOL,
@@ -136,9 +142,22 @@ public static void checkSystemProperties() throws IOException {
136142
conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass);
137143
}
138144

139-
String dataRegionConsensusProtocolClass =
145+
String persistedDataRegionConsensusProtocolClass =
140146
systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null);
141-
if (!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass())) {
147+
String dataRegionConsensusProtocolClass =
148+
ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass);
149+
if (!Objects.equals(
150+
persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) {
151+
systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass);
152+
needRewriteConsensusProtocol = true;
153+
LOGGER.warn(
154+
"[SystemProperties] Normalize {} from {} to {} for compatibility.",
155+
DATA_CONSENSUS_PROTOCOL,
156+
persistedDataRegionConsensusProtocolClass,
157+
dataRegionConsensusProtocolClass);
158+
}
159+
if (!Objects.equals(
160+
dataRegionConsensusProtocolClass, conf.getDataRegionConsensusProtocolClass())) {
142161
LOGGER.warn(
143162
format,
144163
DATA_CONSENSUS_PROTOCOL,
@@ -149,14 +168,18 @@ public static void checkSystemProperties() throws IOException {
149168

150169
String schemaRegionConsensusProtocolClass =
151170
systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null);
152-
if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) {
171+
if (!Objects.equals(
172+
schemaRegionConsensusProtocolClass, conf.getSchemaRegionConsensusProtocolClass())) {
153173
LOGGER.warn(
154174
format,
155175
SCHEMA_CONSENSUS_PROTOCOL,
156176
conf.getSchemaRegionConsensusProtocolClass(),
157177
schemaRegionConsensusProtocolClass);
158178
conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
159179
}
180+
if (needRewriteConsensusProtocol) {
181+
systemPropertiesHandler.overwrite(systemProperties);
182+
}
160183

161184
// PartitionSlot configuration
162185
if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null) {
@@ -265,7 +288,9 @@ public static void storeSystemParameters() throws IOException {
265288
// Consensus protocol configuration
266289
systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, conf.getConfigNodeConsensusProtocolClass());
267290
systemProperties.setProperty(
268-
DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass());
291+
DATA_CONSENSUS_PROTOCOL,
292+
ConsensusFactory.normalizeConsensusProtocolClass(
293+
conf.getDataRegionConsensusProtocolClass()));
269294
systemProperties.setProperty(
270295
SCHEMA_CONSENSUS_PROTOCOL, conf.getSchemaRegionConsensusProtocolClass());
271296

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class ConsensusFactory {
3737
public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus";
3838
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
3939
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
40+
// Keep the pre-rename class name for stale system properties / snapshots restored after a
41+
// jar-only upgrade.
42+
public static final String LEGACY_IOT_CONSENSUS_V2 =
43+
"org.apache.iotdb.consensus.pipe.PipeConsensus";
4044
public static final String REAL_IOT_CONSENSUS_V2 =
4145
"org.apache.iotdb.consensus.pipe.IoTConsensusV2";
4246
public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2";
@@ -49,11 +53,24 @@ private ConsensusFactory() {
4953
throw new IllegalStateException("Utility class ConsensusFactory");
5054
}
5155

56+
// Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted legacy names must be
57+
// normalized to the canonical constant before they fan out.
58+
public static String normalizeConsensusProtocolClass(String className) {
59+
if (className == null) {
60+
return null;
61+
}
62+
if (LEGACY_IOT_CONSENSUS_V2.equals(className) || REAL_IOT_CONSENSUS_V2.equals(className)) {
63+
return IOT_CONSENSUS_V2;
64+
}
65+
return className;
66+
}
67+
5268
public static Optional<IConsensus> getConsensusImpl(
5369
String className, ConsensusConfig config, IStateMachine.Registry registry) {
5470
try {
71+
className = normalizeConsensusProtocolClass(className);
5572
// special judge for IoTConsensusV2
56-
if (className.equals(IOT_CONSENSUS_V2)) {
73+
if (IOT_CONSENSUS_V2.equals(className)) {
5774
className = REAL_IOT_CONSENSUS_V2;
5875
// initialize iotConsensusV2's thrift component
5976
IoTV2GlobalComponentContainer.build();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,33 @@ private void checkImmutableSystemProperties() throws IOException {
282282
if (properties.containsKey(CLUSTER_ID)) {
283283
config.setClusterId(properties.getProperty(CLUSTER_ID));
284284
}
285+
// Only the data region protocol could have been persisted as the old PipeConsensus name
286+
// during a jar-only upgrade, so only that field needs compatibility normalization.
287+
boolean needRewriteConsensusProtocol = false;
285288
if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) {
286289
config.setSchemaRegionConsensusProtocolClass(
287290
properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL));
288291
}
289292
if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) {
290-
config.setDataRegionConsensusProtocolClass(
291-
properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL));
293+
final String persistedDataRegionConsensusProtocolClass =
294+
properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL);
295+
final String dataRegionConsensusProtocolClass =
296+
ConsensusFactory.normalizeConsensusProtocolClass(
297+
persistedDataRegionConsensusProtocolClass);
298+
if (!Objects.equals(
299+
persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) {
300+
properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass);
301+
needRewriteConsensusProtocol = true;
302+
logger.warn(
303+
"[SystemProperties] Normalize {} from {} to {} for compatibility.",
304+
DATA_REGION_CONSENSUS_PROTOCOL,
305+
persistedDataRegionConsensusProtocolClass,
306+
dataRegionConsensusProtocolClass);
307+
}
308+
config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass);
309+
}
310+
if (needRewriteConsensusProtocol) {
311+
systemPropertiesHandler.overwrite(properties);
292312
}
293313
}
294314

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ protected void initConstructors() {
6969
pluginConstructors.put(
7070
BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(),
7171
IoTConsensusV2Processor::new);
72+
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
73+
pluginConstructors.put(
74+
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
75+
IoTConsensusV2Processor::new);
7276
pluginConstructors.put(
7377
BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
7478
RenameDatabaseProcessor::new);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ protected void initConstructors() {
5656
pluginConstructors.put(
5757
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(),
5858
IoTConsensusV2AsyncSink::new);
59+
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
60+
pluginConstructors.put(
61+
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(),
62+
IoTConsensusV2AsyncSink::new);
5963
pluginConstructors.put(
6064
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
6165
IoTDBLegacyPipeSink::new);
@@ -97,5 +101,9 @@ protected void initConstructors() {
97101
pluginConstructors.put(
98102
BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(),
99103
IoTConsensusV2AsyncSink::new);
104+
// Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta.
105+
pluginConstructors.put(
106+
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
107+
IoTConsensusV2AsyncSink::new);
100108
}
101109
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
2727
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
2828
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
29+
import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
30+
import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink;
2931
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
3032
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
3133
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -118,6 +120,20 @@ public void testPipePluginAgent() {
118120
}
119121
}))
120122
.getClass());
123+
Assert.assertEquals(
124+
IoTConsensusV2Processor.class,
125+
agent
126+
.dataRegion()
127+
.reflectProcessor(
128+
new PipeParameters(
129+
new HashMap<String, String>() {
130+
{
131+
put(
132+
PipeProcessorConstant.PROCESSOR_KEY,
133+
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName());
134+
}
135+
}))
136+
.getClass());
121137
Assert.assertEquals(
122138
IoTDBDataRegionAsyncSink.class,
123139
agent
@@ -132,5 +148,19 @@ public void testPipePluginAgent() {
132148
}
133149
}))
134150
.getClass());
151+
Assert.assertEquals(
152+
IoTConsensusV2AsyncSink.class,
153+
agent
154+
.dataRegion()
155+
.reflectSink(
156+
new PipeParameters(
157+
new HashMap<String, String>() {
158+
{
159+
put(
160+
PipeSinkConstant.CONNECTOR_KEY,
161+
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName());
162+
}
163+
}))
164+
.getClass());
135165
}
136166
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public enum BuiltinPipePlugin {
7474
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class),
7575
TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class),
7676
IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", IoTConsensusV2Processor.class),
77+
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
78+
PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", IoTConsensusV2Processor.class),
7779
RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class),
7880

7981
// connectors
@@ -86,6 +88,8 @@ public enum BuiltinPipePlugin {
8688
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
8789
IOT_CONSENSUS_V2_ASYNC_CONNECTOR(
8890
"iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class),
91+
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
92+
PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class),
8993

9094
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
9195
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
@@ -105,6 +109,8 @@ public enum BuiltinPipePlugin {
105109
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
106110
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
107111
IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", IoTConsensusV2AsyncSink.class),
112+
// Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename.
113+
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", IoTConsensusV2AsyncSink.class),
108114
;
109115

110116
private final String pipePluginName;
@@ -158,6 +164,7 @@ public String getClassName() {
158164
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
159165
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
160166
IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(),
167+
PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
161168
RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
162169
// Connectors
163170
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -172,6 +179,7 @@ public String getClassName() {
172179
OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
173180
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
174181
IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
182+
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
175183
// Sinks
176184
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
177185
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
@@ -180,5 +188,6 @@ public String getClassName() {
180188
OPC_UA_SINK.getPipePluginName().toUpperCase(),
181189
OPC_DA_SINK.getPipePluginName().toUpperCase(),
182190
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
183-
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase())));
191+
IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(),
192+
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
184193
}

0 commit comments

Comments
 (0)