Skip to content

Commit 2b7b6cb

Browse files
authored
Load: Fixed multiple bugs (#17413)
* fix * load * Update ClusterConfigTaskExecutor.java * fix
1 parent 30a651a commit 2b7b6cb

10 files changed

Lines changed: 60 additions & 61 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -565,59 +565,52 @@ public void testSpecialPartialInsert() throws Exception {
565565

566566
@Test
567567
public void testTransferMods() {
568-
try {
569-
TestUtils.executeNonQueries(
570-
senderEnv,
571-
Arrays.asList(
572-
"create database root.sg_nonAligned",
573-
"create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy",
574-
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4",
575-
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed",
576-
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip",
577-
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd",
578-
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2",
579-
"insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')",
580-
"flush",
581-
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
582-
String.format(
583-
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
584-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
568+
TestUtils.executeNonQueries(
569+
senderEnv,
570+
Arrays.asList(
571+
"create database root.sg_nonAligned",
572+
"create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy",
573+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4",
574+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed",
575+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip",
576+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd",
577+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2",
578+
"insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')",
579+
"flush",
580+
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
581+
String.format(
582+
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
583+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
585584

586-
TestUtils.assertDataEventuallyOnEnv(
587-
receiverEnv,
588-
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
589-
"count(timeseries),",
590-
Collections.singleton("5,"));
585+
TestUtils.assertDataEventuallyOnEnv(
586+
receiverEnv,
587+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
588+
"count(timeseries),",
589+
Collections.singleton("5,"));
591590

592-
TestUtils.executeNonQueries(
593-
senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime"));
591+
TestUtils.executeNonQueries(
592+
senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime"));
594593

595-
TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
594+
TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
596595

597-
TestUtils.executeNonQueries(
598-
senderEnv,
599-
Arrays.asList(
600-
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
601-
String.format(
602-
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
603-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
596+
TestUtils.executeNonQueries(
597+
senderEnv,
598+
Arrays.asList(
599+
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
600+
String.format(
601+
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
602+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
604603

605-
TestUtils.assertDataEventuallyOnEnv(
606-
receiverEnv,
607-
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
608-
"count(timeseries),",
609-
Collections.singleton("4,"),
610-
15);
611-
TestUtils.assertDataAlwaysOnEnv(
612-
receiverEnv,
613-
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
614-
"count(timeseries),",
615-
Collections.singleton("4,"));
616-
} finally {
617-
TestUtils.executeNonQueries(
618-
senderEnv,
619-
Arrays.asList(
620-
"drop pipe test_history", "drop pipe test_realtime", "drop database root.**"));
621-
}
604+
TestUtils.assertDataEventuallyOnEnv(
605+
receiverEnv,
606+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
607+
"count(timeseries),",
608+
Collections.singleton("4,"),
609+
15);
610+
TestUtils.assertDataAlwaysOnEnv(
611+
receiverEnv,
612+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
613+
"count(timeseries),",
614+
Collections.singleton("4,"));
622615
}
623616
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,10 +387,10 @@ private void verifyTableDataTypeAndGenerateTagColumnMapper(
387387
&& (realColumn == null || !fileColumn.getType().equals(realColumn.getType()))) {
388388
LOGGER.debug(
389389
"Data type mismatch for column {} in table {}, type in TsFile: {}, type in IoTDB: {}",
390-
realColumn.getName(),
390+
fileColumn.getName(),
391391
realSchema.getTableName(),
392392
fileColumn.getType(),
393-
realColumn.getType());
393+
Objects.nonNull(realColumn) ? realColumn.getType() : null);
394394
}
395395
}
396396
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3514,6 +3514,7 @@ public SettableFuture<ConfigTaskResult> removeDataNode(
35143514
future.setException(
35153515
new IOException(
35163516
"The DataNode to be removed is not in the cluster, or the input format is incorrect."));
3517+
return future;
35173518
}
35183519

35193520
LOGGER.info("Starting to remove DataNode with nodeIds: {}", nodeIds);
@@ -3583,6 +3584,7 @@ public SettableFuture<ConfigTaskResult> removeConfigNode(
35833584
future.setException(
35843585
new IOException(
35853586
"The ConfigNode to be removed is not in the cluster, or the input format is incorrect."));
3587+
return future;
35863588
}
35873589

35883590
TConfigNodeLocation configNodeLocation = removeConfigNodeLocations.get(0);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ private List<WritePlanNode> splitByPartitionForTableModel(
152152
statement.isDeleteAfterLoad(),
153153
statement.getWritePointCount(i),
154154
needDecode4TimeColumn));
155+
} else {
156+
throw new IllegalStateException("LoadTsFile statement is null during table model split.");
155157
}
156158
}
157159
return res;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,10 @@ public void start() {
286286
final StringBuilder failedTsFiles =
287287
new StringBuilder(
288288
!tsFileNodeList.isEmpty()
289-
? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
289+
? tsFileNodeList
290+
.get(failedTsFileNodeIndexes.get(0))
291+
.getTsFileResource()
292+
.getTsFilePath()
290293
: "");
291294
final ListIterator<Integer> iterator = failedTsFileNodeIndexes.listIterator(1);
292295
while (iterator.hasNext()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNo
224224
}
225225
}
226226

227-
final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
227+
final Optional<CleanupTask> cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid));
228228
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
229229
try {
230230
final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -304,7 +304,7 @@ public boolean loadAll(
304304
return false;
305305
}
306306

307-
final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
307+
final Optional<CleanupTask> cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid));
308308
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
309309
try {
310310
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, timePartitionProgressIndexMap);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ public synchronized boolean isFilePendingOrLoading(final String file) {
7070
return loadingFileSet.contains(file) || pendingFileSet.contains(file);
7171
}
7272

73-
public int size() {
73+
public synchronized int size() {
7474
return pendingFileQueue.size() + loadingFileSet.size();
7575
}
7676

77-
public boolean isEmpty() {
77+
public synchronized boolean isEmpty() {
7878
return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
7979
}
8080

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ public static void validateSynonymParameters(final Map<String, String> parameter
8383

8484
public static void validateDatabaseLevelParam(final String databaseLevel) {
8585
try {
86-
int level = Integer.parseInt(databaseLevel);
86+
final int level = Integer.parseInt(databaseLevel);
8787
if (level < DATABASE_LEVEL_MIN_VALUE) {
8888
throw new SemanticException(
8989
String.format(
9090
"Given database level %d is less than the minimum value %d, please input a valid database level.",
9191
level, DATABASE_LEVEL_MIN_VALUE));
9292
}
93-
} catch (Exception e) {
93+
} catch (final NumberFormatException e) {
9494
throw new SemanticException(
9595
String.format(
9696
"Given database level %s is not a valid integer, please input a valid database level.",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ synchronized void forceResize(LoadTsFileMemoryBlock memoryBlock, long newSizeInB
156156
try {
157157
forceAllocateFromQuery(bytesNeeded);
158158
if (LOGGER.isDebugEnabled()) {
159-
LOGGER.info(
159+
LOGGER.debug(
160160
"Load: Force resized LoadTsFileMemoryBlock with memory from query engine, size added: {}, new size: {}",
161161
bytesNeeded,
162162
newSizeInBytes);
@@ -180,7 +180,6 @@ public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock(
180180
final long actuallyAllocateMemoryInBytes =
181181
tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
182182
dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
183-
usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes);
184183
LOGGER.info(
185184
"Create Data Cache Memory Block {}, allocate memory {}",
186185
dataCacheMemoryBlock,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void unbindFrom(AbstractMetricService metricService) {
159159
stage));
160160

161161
metricService.remove(
162-
MetricType.RATE,
162+
MetricType.COUNTER,
163163
Metric.LOAD_DISK_IO.toString(),
164164
Tag.NAME.toString(),
165165
String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));

0 commit comments

Comments
 (0)