Skip to content

Commit 6b33dea

Browse files
authored
Pipe: Fixed the bug that mod may not be released in historical pipe (#17379)
* z * shop * fix * fix * Update IoTDBPipeDataSinkIT.java
1 parent 35326e8 commit 6b33dea

3 files changed

Lines changed: 73 additions & 22 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void setUp() {
6565
protected void setupConfig() {
6666
super.setupConfig();
6767
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
68+
senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
6869
}
6970

7071
@Test
@@ -105,9 +106,6 @@ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
105106

106107
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
107108

108-
Assert.assertEquals(
109-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
110-
111109
// Do not fail if the failure has nothing to do with pipe
112110
// Because the failures will randomly generate due to resource limitation
113111
TestUtils.executeNonQueries(
@@ -172,9 +170,6 @@ private void testSinkFormat(final String format) throws Exception {
172170
.setProcessorAttributes(processorAttributes))
173171
.getCode());
174172

175-
Assert.assertEquals(
176-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
177-
178173
// Do not fail if the failure has nothing to do with pipe
179174
// Because the failures will randomly generate due to resource limitation
180175
TestUtils.executeNonQueries(
@@ -236,9 +231,6 @@ public void testLegacySink() throws Exception {
236231

237232
try (final SyncConfigNodeIServiceClient client =
238233
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
239-
Assert.assertEquals(
240-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
241-
242234
// Do not fail if the failure has nothing to do with pipe
243235
// Because the failures will randomly generate due to resource limitation
244236
TestUtils.executeNonQueries(
@@ -416,9 +408,6 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
416408
.setProcessorAttributes(processorAttributes))
417409
.getCode());
418410

419-
Assert.assertEquals(
420-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
421-
422411
// Do not fail if the failure has nothing to do with pipe
423412
// Because the failures will randomly generate due to resource limitation
424413
TestUtils.executeNonQueries(
@@ -512,9 +501,6 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
512501
.setProcessorAttributes(processorAttributes))
513502
.getCode());
514503

515-
Assert.assertEquals(
516-
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
517-
518504
TestUtils.executeNonQueries(
519505
senderEnv,
520506
Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", "flush"),
@@ -576,4 +562,62 @@ public void testSpecialPartialInsert() throws Exception {
576562
"1635232151960,null,null,2.0,2.1,null,",
577563
"1635232143960,6.0,4.0,null,null,null,")));
578564
}
565+
566+
@Test
567+
public void testTransferMods() {
568+
try {
569+
TestUtils.executeNonQueries(
570+
senderEnv,
571+
Arrays.asList(
572+
"create database root.sg_nonAligned",
573+
"create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy",
574+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4",
575+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed",
576+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip",
577+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd",
578+
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2",
579+
"insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')",
580+
"flush",
581+
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
582+
String.format(
583+
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
584+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
585+
586+
TestUtils.assertDataEventuallyOnEnv(
587+
receiverEnv,
588+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
589+
"count(timeseries),",
590+
Collections.singleton("5,"));
591+
592+
TestUtils.executeNonQueries(
593+
senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime"));
594+
595+
TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
596+
597+
TestUtils.executeNonQueries(
598+
senderEnv,
599+
Arrays.asList(
600+
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
601+
String.format(
602+
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
603+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
604+
605+
TestUtils.assertDataEventuallyOnEnv(
606+
receiverEnv,
607+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
608+
"count(timeseries),",
609+
Collections.singleton("4,"),
610+
15);
611+
TestUtils.assertDataAlwaysOnEnv(
612+
receiverEnv,
613+
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
614+
"count(timeseries),",
615+
Collections.singleton("4,"));
616+
} finally {
617+
TestUtils.executeNonQueries(
618+
senderEnv,
619+
Arrays.asList(
620+
"drop pipe test_history", "drop pipe test_realtime", "drop database root.**"));
621+
}
622+
}
579623
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,13 +355,18 @@ public void pinTsFileResource(
355355
}
356356
}
357357

358-
public void unpinTsFileResource(final TsFileResource resource, final @Nullable String pipeName)
358+
public void unpinTsFileResource(
359+
final TsFileResource resource,
360+
final boolean shouldTransferModFile,
361+
final @Nullable String pipeName)
359362
throws IOException {
360-
final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName);
361-
decreaseFileReference(pinnedFile, pipeName);
363+
decreaseFileReference(
364+
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), pipeName);
362365

363-
if (resource.sharedModFileExists()) {
364-
decreaseFileReference(resource.getSharedModFile().getFile(), pipeName);
366+
if (shouldTransferModFile && resource.exclusiveModFileExists()) {
367+
decreaseFileReference(
368+
getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), pipeName),
369+
pipeName);
365370
}
366371
}
367372

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
904904
return isReferenceCountIncreased ? event : null;
905905
} finally {
906906
try {
907-
PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, pipeName);
907+
PipeDataNodeResourceManager.tsfile()
908+
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
908909
} catch (final IOException e) {
909910
LOGGER.warn(
910911
"Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}",
@@ -989,7 +990,8 @@ public synchronized void close() {
989990
if (resource instanceof TsFileResource) {
990991
try {
991992
PipeDataNodeResourceManager.tsfile()
992-
.unpinTsFileResource((TsFileResource) resource, pipeName);
993+
.unpinTsFileResource(
994+
(TsFileResource) resource, shouldTransferModFile, pipeName);
993995
} catch (final IOException e) {
994996
LOGGER.warn(
995997
"Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}",

0 commit comments

Comments
 (0)