Skip to content

Commit 9bfe0b0

Browse files
authored
Pipe: Made the historical pipe split auto dropped after completion (#17295)
* snapshot * may-comp * auto
1 parent 6b33dea commit 9bfe0b0

2 files changed

Lines changed: 70 additions & 55 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
2525
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
2626
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
27+
import org.apache.iotdb.consensus.ConsensusFactory;
2728
import org.apache.iotdb.db.it.utils.TestUtils;
2829
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2930
import org.apache.iotdb.it.framework.IoTDBTestRunner;
3031
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
32+
import org.apache.iotdb.itbase.env.BaseEnv;
3133
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
3234
import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
3335
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,6 +42,7 @@
4042

4143
import java.sql.Connection;
4244
import java.sql.ResultSet;
45+
import java.sql.SQLException;
4346
import java.sql.Statement;
4447
import java.util.Collections;
4548
import java.util.HashMap;
@@ -49,6 +52,7 @@
4952

5053
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
5154
import static org.awaitility.Awaitility.await;
55+
import static org.junit.Assert.fail;
5256

5357
@RunWith(IoTDBTestRunner.class)
5458
@Category({MultiClusterIT2DualTableManualEnhanced.class})
@@ -60,6 +64,34 @@ public void setUp() {
6064
super.setUp();
6165
}
6266

67+
protected void setupConfig() {
68+
// Enable auto split
69+
senderEnv
70+
.getConfig()
71+
.getCommonConfig()
72+
.setAutoCreateSchemaEnabled(true)
73+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
74+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
75+
.setEnforceStrongPassword(false)
76+
.setPipeMemoryManagementEnabled(false)
77+
.setIsPipeEnableMemoryCheck(false);
78+
receiverEnv
79+
.getConfig()
80+
.getCommonConfig()
81+
.setAutoCreateSchemaEnabled(true)
82+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
83+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
84+
.setEnforceStrongPassword(false)
85+
.setPipeMemoryManagementEnabled(false)
86+
.setIsPipeEnableMemoryCheck(false);
87+
88+
// 10 min, assert that the operations will not time out
89+
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
90+
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
91+
92+
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
93+
}
94+
6395
@Test
6496
public void testAutoDropInHistoricalTransfer() throws Exception {
6597
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -70,48 +102,29 @@ public void testAutoDropInHistoricalTransfer() throws Exception {
70102
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
71103
};
72104

73-
final String receiverIp = receiverDataNode.getIp();
74-
final int receiverPort = receiverDataNode.getPort();
75-
76-
try (final SyncConfigNodeIServiceClient client =
77-
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
78-
79-
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
80-
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
81-
82-
final Map<String, String> extractorAttributes = new HashMap<>();
83-
final Map<String, String> processorAttributes = new HashMap<>();
84-
final Map<String, String> connectorAttributes = new HashMap<>();
85-
86-
extractorAttributes.put("mode.snapshot", "true");
87-
extractorAttributes.put("capture.table", "true");
88-
extractorAttributes.put("user", "root");
89-
90-
connectorAttributes.put("connector", "iotdb-thrift-connector");
91-
connectorAttributes.put("connector.batch.enable", "false");
92-
connectorAttributes.put("connector.ip", receiverIp);
93-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
94-
95-
final TSStatus status =
96-
client.createPipe(
97-
new TCreatePipeReq("p1", connectorAttributes)
98-
.setExtractorAttributes(extractorAttributes)
99-
.setProcessorAttributes(processorAttributes));
105+
// Create an ordinary full sync pipe
106+
final String sql =
107+
String.format("create pipe a2b ('node-urls'='%s')", receiverDataNode.getIpAndPortString());
108+
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
109+
final Statement statement = connection.createStatement()) {
110+
statement.execute(sql);
111+
} catch (SQLException e) {
112+
fail(e.getMessage());
113+
}
100114

101-
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
102-
Assert.assertEquals(
103-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
115+
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
116+
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
104117

105-
TestUtils.assertDataEventuallyOnEnv(
106-
receiverEnv,
107-
TableModelUtils.getQueryCountSql("test"),
108-
"_col0,",
109-
Collections.singleton("100,"),
110-
"test",
111-
handleFailure);
112-
}
118+
TestUtils.assertDataEventuallyOnEnv(
119+
receiverEnv,
120+
TableModelUtils.getQueryCountSql("test"),
121+
"_col0,",
122+
Collections.singleton("100,"),
123+
"test",
124+
handleFailure);
113125

114-
try (final Connection connection = makeItCloseQuietly(senderEnv.getConnection());
126+
try (final Connection connection =
127+
makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT));
115128
final Statement statement = makeItCloseQuietly(connection.createStatement()); ) {
116129
ResultSet result = statement.executeQuery("show pipes");
117130
await()
@@ -124,9 +137,9 @@ public void testAutoDropInHistoricalTransfer() throws Exception {
124137
try {
125138
int pipeNum = 0;
126139
while (result.next()) {
127-
if (!result
128-
.getString(ColumnHeaderConstant.ID)
129-
.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
140+
final String pipeName = result.getString(ColumnHeaderConstant.ID);
141+
if (!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
142+
&& pipeName.endsWith("_history")) {
130143
pipeNum++;
131144
}
132145
}
@@ -157,25 +170,25 @@ public void testAutoDropInHistoricalTransferWithTimeRange() throws Exception {
157170
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
158171
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
159172

160-
final Map<String, String> extractorAttributes = new HashMap<>();
173+
final Map<String, String> sourceAttributes = new HashMap<>();
161174
final Map<String, String> processorAttributes = new HashMap<>();
162-
final Map<String, String> connectorAttributes = new HashMap<>();
175+
final Map<String, String> sinkAttributes = new HashMap<>();
163176

164-
extractorAttributes.put("mode.snapshot", "true");
165-
extractorAttributes.put("capture.table", "true");
166-
extractorAttributes.put("start-time", "0");
167-
extractorAttributes.put("end-time", "49");
168-
extractorAttributes.put("user", "root");
177+
sourceAttributes.put("mode.snapshot", "true");
178+
sourceAttributes.put("capture.table", "true");
179+
sourceAttributes.put("start-time", "0");
180+
sourceAttributes.put("end-time", "49");
181+
sourceAttributes.put("user", "root");
169182

170-
connectorAttributes.put("connector", "iotdb-thrift-connector");
171-
connectorAttributes.put("connector.batch.enable", "false");
172-
connectorAttributes.put("connector.ip", receiverIp);
173-
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
183+
sinkAttributes.put("sink", "iotdb-thrift-sink");
184+
sinkAttributes.put("sink.batch.enable", "false");
185+
sinkAttributes.put("sink.ip", receiverIp);
186+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
174187

175188
final TSStatus status =
176189
client.createPipe(
177-
new TCreatePipeReq("p1", connectorAttributes)
178-
.setExtractorAttributes(extractorAttributes)
190+
new TCreatePipeReq("p1", sinkAttributes)
191+
.setExtractorAttributes(sourceAttributes)
179192
.setProcessorAttributes(processorAttributes));
180193

181194
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,8 @@ public SettableFuture<ConfigTaskResult> createPipe(
22532253
Boolean.toString(false),
22542254
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
22552255
Boolean.toString(true),
2256+
PipeSourceConstant.EXTRACTOR_MODE_KEY,
2257+
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE,
22562258
// We force the historical pipe to transfer data (and maybe
22572259
// deletion) only
22582260
// Thus we can transfer schema only once

0 commit comments

Comments
 (0)