3737import java .sql .ResultSet ;
3838import java .sql .Statement ;
3939import java .util .Collections ;
40+ import java .util .HashSet ;
41+ import java .util .List ;
4042import java .util .Map ;
4143import java .util .Set ;
4244import java .util .concurrent .TimeUnit ;
@@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
6971 protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300 ;
7072
7173 protected static final String INSERTION1 =
72- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values (100, 1, 2)" ;
74+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (100, 1, 2, 3 )" ;
7375 protected static final String INSERTION2 =
74- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values (101, 3, 4 )" ;
76+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (101, 4, 5, 6 )" ;
7577 protected static final String INSERTION3 =
76- "INSERT INTO root.sg.d1(timestamp,speed,temperature) values (102, 5, 6 )" ;
78+ "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (102, 7, 8, 9 )" ;
7779 protected static final String FLUSH_COMMAND = "flush on cluster" ;
7880 protected static final String COUNT_QUERY = "select count(*) from root.sg.**" ;
79- protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1" ;
81+ protected static final String SELECT_ALL_QUERY =
82+ "select speed, temperature, power from root.sg.d1" ;
83+ protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES root.sg.d1.speed" ;
84+ protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*" ;
85+ protected static final String SELECT_SURVIVING_QUERY =
86+ "SELECT temperature, power FROM root.sg.d1" ;
8087
8188 /**
8289 * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
@@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
210217 }
211218 }
212219
220+ /**
221+ * Test that DELETE TIMESERIES is properly replicated to all DataNode replicas via IoTConsensusV2.
222+ *
223+ * <p>This test reproduces the scenario from the historical deletion replication bug: when a
224+ * timeseries is deleted after data insertion (with some unflushed data), the deletion event must
225+ * be consistently replicated to all replicas. After waiting for replication to complete, stopping
226+ * each DataNode in turn should show the same schema on all surviving nodes.
227+ *
228+ * <p>Scenario:
229+ *
230+ * <ol>
231+ * <li>Insert data into root.sg.d1 with 3 measurements (speed, temperature, power), flush
232+ * <li>Insert more data (unflushed to create WAL-only entries)
233+ * <li>DELETE TIMESERIES root.sg.d1.speed
234+ * <li>Flush again to persist deletion
235+ * <li>Wait for replication to complete on all DataNodes
236+ * <li>Verify that every DataNode independently shows the same timeseries (speed is gone)
237+ * </ol>
238+ */
239+ public void testDeleteTimeSeriesReplicaConsistency () throws Exception {
240+ try (Connection connection = makeItCloseQuietly (EnvFactory .getEnv ().getConnection ());
241+ Statement statement = makeItCloseQuietly (connection .createStatement ())) {
242+
243+ // Step 1: Insert data with 3 measurements and flush
244+ LOGGER .info (
245+ "Step 1: Inserting data with 3 measurements and flushing (mode: {})..." ,
246+ getIoTConsensusV2Mode ());
247+ statement .execute (INSERTION1 );
248+ statement .execute (INSERTION2 );
249+ statement .execute (FLUSH_COMMAND );
250+
251+ // Step 2: Insert more data without flush (creates WAL-only entries)
252+ LOGGER .info ("Step 2: Inserting more data without flush (WAL-only entries)..." );
253+ statement .execute (INSERTION3 );
254+
255+ // Step 3: Delete one timeseries
256+ LOGGER .info ("Step 3: Deleting timeseries root.sg.d1.speed..." );
257+ statement .execute (DELETE_TIMESERIES_SPEED );
258+
259+ // Step 4: Flush again to persist the deletion
260+ LOGGER .info ("Step 4: Flushing to persist deletion..." );
261+ statement .execute (FLUSH_COMMAND );
262+
263+ // Verify on the current connection: speed should be gone, 2 timeseries remain
264+ verifyTimeSeriesAfterDelete (statement , "via initial connection" );
265+
266+ // Step 5: Wait for replication to complete on data region leaders
267+ LOGGER .info ("Step 5: Waiting for replication to complete on data region leaders..." );
268+ Map <Integer , Pair <Integer , Set <Integer >>> dataRegionMap =
269+ getDataRegionMapWithLeader (statement );
270+ Set <Integer > leaderNodeIds = new HashSet <>();
271+ for (Pair <Integer , Set <Integer >> leaderAndReplicas : dataRegionMap .values ()) {
272+ if (leaderAndReplicas .getLeft () > 0 ) {
273+ leaderNodeIds .add (leaderAndReplicas .getLeft ());
274+ }
275+ }
276+ for (int leaderNodeId : leaderNodeIds ) {
277+ EnvFactory .getEnv ()
278+ .dataNodeIdToWrapper (leaderNodeId )
279+ .ifPresent (this ::waitForReplicationComplete );
280+ }
281+
282+ // Step 6: Verify schema consistency on each DataNode independently
283+ LOGGER .info ("Step 6: Verifying schema consistency on each DataNode independently..." );
284+ List <DataNodeWrapper > dataNodeWrappers = EnvFactory .getEnv ().getDataNodeWrapperList ();
285+ for (DataNodeWrapper wrapper : dataNodeWrappers ) {
286+ String nodeDescription = "DataNode " + wrapper .getIp () + ":" + wrapper .getPort ();
287+ LOGGER .info ("Verifying schema on {}" , nodeDescription );
288+ Awaitility .await ()
289+ .atMost (60 , TimeUnit .SECONDS )
290+ .untilAsserted (
291+ () -> {
292+ try (Connection nodeConn =
293+ makeItCloseQuietly (
294+ EnvFactory .getEnv ()
295+ .getConnection (
296+ wrapper ,
297+ SessionConfig .DEFAULT_USER ,
298+ SessionConfig .DEFAULT_PASSWORD ,
299+ BaseEnv .TREE_SQL_DIALECT ));
300+ Statement nodeStmt = makeItCloseQuietly (nodeConn .createStatement ())) {
301+ verifyTimeSeriesAfterDelete (nodeStmt , nodeDescription );
302+ }
303+ });
304+ }
305+
306+ // Step 7: Stop each DataNode one by one and verify remaining nodes still consistent
307+ LOGGER .info (
308+ "Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema..." );
309+ for (DataNodeWrapper stoppedNode : dataNodeWrappers ) {
310+ String stoppedDesc = "DataNode " + stoppedNode .getIp () + ":" + stoppedNode .getPort ();
311+ LOGGER .info ("Stopping {}" , stoppedDesc );
312+ stoppedNode .stopForcibly ();
313+ Assert .assertFalse (stoppedDesc + " should be stopped" , stoppedNode .isAlive ());
314+
315+ try {
316+ // Verify schema on each surviving node
317+ for (DataNodeWrapper aliveNode : dataNodeWrappers ) {
318+ if (aliveNode == stoppedNode ) {
319+ continue ;
320+ }
321+ String aliveDesc = "DataNode " + aliveNode .getIp () + ":" + aliveNode .getPort ();
322+ Awaitility .await ()
323+ .pollDelay (1 , TimeUnit .SECONDS )
324+ .atMost (90 , TimeUnit .SECONDS )
325+ .untilAsserted (
326+ () -> {
327+ try (Connection aliveConn =
328+ makeItCloseQuietly (
329+ EnvFactory .getEnv ()
330+ .getConnection (
331+ aliveNode ,
332+ SessionConfig .DEFAULT_USER ,
333+ SessionConfig .DEFAULT_PASSWORD ,
334+ BaseEnv .TREE_SQL_DIALECT ));
335+ Statement aliveStmt = makeItCloseQuietly (aliveConn .createStatement ())) {
336+ verifyTimeSeriesAfterDelete (
337+ aliveStmt , aliveDesc + " (while " + stoppedDesc + " is down)" );
338+ }
339+ });
340+ }
341+ } finally {
342+ // Restart the stopped node before moving to the next iteration
343+ LOGGER .info ("Restarting {}" , stoppedDesc );
344+ stoppedNode .start ();
345+ // Wait for the restarted node to rejoin
346+ Awaitility .await ()
347+ .atMost (120 , TimeUnit .SECONDS )
348+ .pollInterval (2 , TimeUnit .SECONDS )
349+ .until (stoppedNode ::isAlive );
350+ }
351+ }
352+
353+ LOGGER .info (
354+ "DELETE TIMESERIES replica consistency test passed for mode: {}" ,
355+ getIoTConsensusV2Mode ());
356+ }
357+ }
358+
359+ /**
360+ * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and
361+ * that data queries do not return the deleted timeseries.
362+ */
363+ private void verifyTimeSeriesAfterDelete (Statement statement , String context ) throws Exception {
364+ // Verify via SHOW TIMESERIES: speed should be gone, only temperature and power remain
365+ Set <String > timeseries = new HashSet <>();
366+ try (ResultSet resultSet = statement .executeQuery (SHOW_TIMESERIES_D1 )) {
367+ while (resultSet .next ()) {
368+ timeseries .add (resultSet .getString ("Timeseries" ));
369+ }
370+ }
371+ LOGGER .info ("[{}] SHOW TIMESERIES result: {}" , context , timeseries );
372+ Assert .assertEquals (
373+ "[" + context + "] Expected exactly 2 timeseries after delete (temperature, power)" ,
374+ 2 ,
375+ timeseries .size ());
376+ Assert .assertFalse (
377+ "[" + context + "] root.sg.d1.speed should have been deleted" ,
378+ timeseries .contains ("root.sg.d1.speed" ));
379+ Assert .assertTrue (
380+ "[" + context + "] root.sg.d1.temperature should still exist" ,
381+ timeseries .contains ("root.sg.d1.temperature" ));
382+ Assert .assertTrue (
383+ "[" + context + "] root.sg.d1.power should still exist" ,
384+ timeseries .contains ("root.sg.d1.power" ));
385+
386+ // Verify via SELECT: only temperature and power columns should return data
387+ try (ResultSet selectResult = statement .executeQuery (SELECT_SURVIVING_QUERY )) {
388+ int rowCount = 0 ;
389+ while (selectResult .next ()) {
390+ rowCount ++;
391+ }
392+ // After delete, remaining data depends on whether unflushed data for the deleted
393+ // timeseries was also cleaned up. We mainly verify that the query doesn't fail
394+ // and that some rows are returned for the surviving measurements.
395+ Assert .assertTrue (
396+ "[" + context + "] Expected at least 1 row from SELECT on surviving timeseries" ,
397+ rowCount >= 1 );
398+ }
399+ }
400+
213401 private static final Pattern SYNC_LAG_PATTERN =
214402 Pattern .compile ("iot_consensus_v2\\ {[^}]*type=\" syncLag\" [^}]*}\\ s+(\\ S+)" );
215403
@@ -259,7 +447,7 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
259447 totalCount += parseLongFromString (countResult .getString (i ));
260448 }
261449 Assert .assertEquals (
262- "Expected 6 total data points (3 timestamps x 2 measurements)" , 6 , totalCount );
450+ "Expected 9 total data points (3 timestamps x 3 measurements)" , 9 , totalCount );
263451 }
264452
265453 int rowCount = 0 ;
@@ -269,15 +457,19 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
269457 long timestamp = parseLongFromString (selectResult .getString (1 ));
270458 long speed = parseLongFromString (selectResult .getString (2 ));
271459 long temperature = parseLongFromString (selectResult .getString (3 ));
460+ long power = parseLongFromString (selectResult .getString (4 ));
272461 if (timestamp == 100 ) {
273462 Assert .assertEquals (1 , speed );
274463 Assert .assertEquals (2 , temperature );
464+ Assert .assertEquals (3 , power );
275465 } else if (timestamp == 101 ) {
276- Assert .assertEquals (3 , speed );
277- Assert .assertEquals (4 , temperature );
466+ Assert .assertEquals (4 , speed );
467+ Assert .assertEquals (5 , temperature );
468+ Assert .assertEquals (6 , power );
278469 } else if (timestamp == 102 ) {
279- Assert .assertEquals (5 , speed );
280- Assert .assertEquals (6 , temperature );
470+ Assert .assertEquals (7 , speed );
471+ Assert .assertEquals (8 , temperature );
472+ Assert .assertEquals (9 , power );
281473 }
282474 }
283475 }
0 commit comments