Skip to content

Commit 3b02d32

Browse files
authored
Fix underflow exception caused by serialize function of DataPartitionTableIntegrityCheckProcedure (#17369)
1 parent 58ec38a commit 3b02d32

2 files changed

Lines changed: 209 additions & 2 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
3939
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
4040
import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
41+
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
4142
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
4243
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
4344
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
@@ -116,9 +117,9 @@ public class DataPartitionTableIntegrityCheckProcedure
116117
*/
117118
private Map<String, DataPartitionTable> finalDataPartitionTables;
118119

119-
private static Set<TDataNodeConfiguration> skipDataNodes =
120+
private Set<TDataNodeConfiguration> skipDataNodes =
120121
Collections.newSetFromMap(new ConcurrentHashMap<>());
121-
private static Set<TDataNodeConfiguration> failedDataNodes =
122+
private Set<TDataNodeConfiguration> failedDataNodes =
122123
Collections.newSetFromMap(new ConcurrentHashMap<>());
123124

124125
// ============Need serialize END=============/
@@ -181,6 +182,12 @@ protected void rollbackState(
181182
case MERGE_PARTITION_TABLES:
182183
finalDataPartitionTables.clear();
183184
break;
185+
case WRITE_PARTITION_TABLE_TO_CONSENSUS:
186+
allDataNodes.clear();
187+
earliestTimeslots.clear();
188+
dataPartitionTables.clear();
189+
finalDataPartitionTables.clear();
190+
break;
184191
default:
185192
allDataNodes.clear();
186193
earliestTimeslots.clear();
@@ -703,6 +710,7 @@ private void delayRollbackNextState(DataPartitionTableIntegrityCheckProcedureSta
703710

704711
@Override
705712
public void serialize(final DataOutputStream stream) throws IOException {
713+
stream.writeShort(ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE.getTypeCode());
706714
super.serialize(stream);
707715

708716
// Serialize earliestTimeslots
@@ -969,4 +977,54 @@ private List<DatabaseScopedDataPartitionTable> deserializeDatabaseScopedTableLis
969977

970978
return result;
971979
}
980+
981+
public Map<String, Long> getEarliestTimeslots() {
982+
return earliestTimeslots;
983+
}
984+
985+
public Map<Integer, List<DatabaseScopedDataPartitionTable>> getDataPartitionTables() {
986+
return dataPartitionTables;
987+
}
988+
989+
public Set<String> getDatabasesWithLostDataPartition() {
990+
return databasesWithLostDataPartition;
991+
}
992+
993+
public Map<String, DataPartitionTable> getFinalDataPartitionTables() {
994+
return finalDataPartitionTables;
995+
}
996+
997+
public Set<TDataNodeConfiguration> getSkipDataNodes() {
998+
return skipDataNodes;
999+
}
1000+
1001+
public Set<TDataNodeConfiguration> getFailedDataNodes() {
1002+
return failedDataNodes;
1003+
}
1004+
1005+
public void setEarliestTimeslots(Map<String, Long> earliestTimeslots) {
1006+
this.earliestTimeslots = earliestTimeslots;
1007+
}
1008+
1009+
public void setDataPartitionTables(
1010+
Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables) {
1011+
this.dataPartitionTables = dataPartitionTables;
1012+
}
1013+
1014+
public void setDatabasesWithLostDataPartition(Set<String> databasesWithLostDataPartition) {
1015+
this.databasesWithLostDataPartition = databasesWithLostDataPartition;
1016+
}
1017+
1018+
public void setFinalDataPartitionTables(
1019+
Map<String, DataPartitionTable> finalDataPartitionTables) {
1020+
this.finalDataPartitionTables = finalDataPartitionTables;
1021+
}
1022+
1023+
public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) {
1024+
this.skipDataNodes = skipDataNodes;
1025+
}
1026+
1027+
public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) {
1028+
this.failedDataNodes = failedDataNodes;
1029+
}
9721030
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.procedure.impl.partition;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
23+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25+
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
26+
import org.apache.iotdb.commons.partition.DataPartitionTable;
27+
import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
28+
import org.apache.iotdb.confignode.procedure.Procedure;
29+
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
30+
31+
import org.apache.tsfile.utils.PublicBAOS;
32+
import org.junit.Assert;
33+
import org.junit.Test;
34+
35+
import java.io.DataOutputStream;
36+
import java.io.IOException;
37+
import java.nio.ByteBuffer;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.HashSet;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Set;
44+
45+
public class DataPartitionTableIntegrityCheckProcedureTest {
46+
@Test
47+
public void serDeTest() throws IOException {
48+
DataPartitionTableIntegrityCheckProcedure original = createTestProcedureWithData();
49+
50+
try (PublicBAOS baos = new PublicBAOS();
51+
DataOutputStream dos = new DataOutputStream(baos)) {
52+
53+
original.serialize(dos);
54+
55+
System.out.println("Serialized bytes length: " + baos.size());
56+
57+
ByteBuffer buffer = ByteBuffer.wrap(baos.getBuf(), 0, baos.size());
58+
59+
Procedure<?> recreated = ProcedureFactory.getInstance().create(buffer);
60+
61+
if (recreated instanceof DataPartitionTableIntegrityCheckProcedure) {
62+
DataPartitionTableIntegrityCheckProcedure actual =
63+
(DataPartitionTableIntegrityCheckProcedure) recreated;
64+
assertProcedureEquals(original, actual);
65+
System.out.println("All checked fields match!");
66+
} else {
67+
Assert.fail("Recreated is not DataPartitionTableIntegrityCheckProcedure");
68+
}
69+
}
70+
}
71+
72+
private DataPartitionTableIntegrityCheckProcedure createTestProcedureWithData() {
73+
DataPartitionTableIntegrityCheckProcedure proc =
74+
new DataPartitionTableIntegrityCheckProcedure();
75+
String database = "root.test";
76+
77+
Map<String, Long> earliestTimeslots = new HashMap<>();
78+
earliestTimeslots.put(database, 0L);
79+
proc.setEarliestTimeslots(earliestTimeslots);
80+
81+
Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables = new HashMap<>();
82+
DataPartitionTable dataPartitionTable = new DataPartitionTable();
83+
dataPartitionTables.put(
84+
1,
85+
Collections.singletonList(
86+
new DatabaseScopedDataPartitionTable(database, dataPartitionTable)));
87+
proc.setDataPartitionTables(dataPartitionTables);
88+
89+
Set<String> databasesWithLostDataPartition = new HashSet<>();
90+
databasesWithLostDataPartition.add(database);
91+
proc.setDatabasesWithLostDataPartition(databasesWithLostDataPartition);
92+
93+
Map<String, DataPartitionTable> finalDataPartitionTables = new HashMap<>();
94+
finalDataPartitionTables.put(database, dataPartitionTable);
95+
proc.setFinalDataPartitionTables(finalDataPartitionTables);
96+
97+
Set<TDataNodeConfiguration> skipNodes = getTDataNodeConfigurations(1);
98+
proc.setSkipDataNodes(skipNodes);
99+
100+
Set<TDataNodeConfiguration> failedNodes = getTDataNodeConfigurations(2);
101+
proc.setFailedDataNodes(failedNodes);
102+
103+
return proc;
104+
}
105+
106+
private static Set<TDataNodeConfiguration> getTDataNodeConfigurations(int dataNodeId) {
107+
Set<TDataNodeConfiguration> nodes = new HashSet<>();
108+
TDataNodeLocation tDataNodeConfiguration =
109+
new TDataNodeLocation(
110+
dataNodeId,
111+
new TEndPoint("127.0.0.1", 5),
112+
new TEndPoint("127.0.0.1", 6),
113+
new TEndPoint("127.0.0.1", 7),
114+
new TEndPoint("127.0.0.1", 8),
115+
new TEndPoint("127.0.0.1", 9));
116+
TNodeResource resource = new TNodeResource(16, 34359738368L);
117+
TDataNodeConfiguration skipDataNodeConfiguration =
118+
new TDataNodeConfiguration(tDataNodeConfiguration, resource);
119+
nodes.add(skipDataNodeConfiguration);
120+
return nodes;
121+
}
122+
123+
private void assertProcedureEquals(
124+
DataPartitionTableIntegrityCheckProcedure expected,
125+
DataPartitionTableIntegrityCheckProcedure actual) {
126+
Assert.assertEquals("procId mismatch", expected.getProcId(), actual.getProcId());
127+
Assert.assertEquals("state mismatch", expected.getState(), actual.getState());
128+
Assert.assertEquals(
129+
"earliestTimeslots mismatch",
130+
expected.getEarliestTimeslots(),
131+
actual.getEarliestTimeslots());
132+
Assert.assertEquals(
133+
"dataPartitionTables mismatch",
134+
expected.getDataPartitionTables(),
135+
actual.getDataPartitionTables());
136+
Assert.assertEquals(
137+
"databasesWithLostDataPartition mismatch",
138+
expected.getDatabasesWithLostDataPartition(),
139+
actual.getDatabasesWithLostDataPartition());
140+
Assert.assertEquals(
141+
"finalDataPartitionTables mismatch",
142+
expected.getFinalDataPartitionTables(),
143+
actual.getFinalDataPartitionTables());
144+
Assert.assertEquals(
145+
"skipDataNodes mismatch", expected.getSkipDataNodes(), actual.getSkipDataNodes());
146+
Assert.assertEquals(
147+
"failedDataNodes mismatch", expected.getFailedDataNodes(), actual.getFailedDataNodes());
148+
}
149+
}

0 commit comments

Comments
 (0)