Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
private File tsFile;
private long extractTime = 0;

// Whether this event should transfer mod files if they exist.
private boolean shouldTransferModFile;
// This is true iff the modFile exists and should be transferred
private boolean isWithMod;
private File modFile;
Expand Down Expand Up @@ -174,8 +176,8 @@ public PipeTsFileInsertionEvent(
// hard-link it to each pipe dir
this.tsFile = Objects.isNull(tsFile) ? resource.getTsFile() : tsFile;

this.isWithMod = isWithMod && resource.anyModFileExists();
this.modFile = this.isWithMod ? resource.getExclusiveModFile().getFile() : null;
this.shouldTransferModFile = isWithMod;
refreshModFileState();
// TODO: process the shared mod file
this.sharedModFile =
resource.getSharedModFile() != null ? resource.getSharedModFile().getFile() : null;
Expand Down Expand Up @@ -276,6 +278,7 @@ public File getTsFile() {
}

public File getModFile() {
refreshModFileState();
return modFile;
}

Expand All @@ -284,13 +287,13 @@ public File getSharedModFile() {
}

public boolean isWithMod() {
refreshModFileState();
return isWithMod;
}

// If the previous "isWithMod" is false, the modFile has been set to "null", then the isWithMod
// can't be set to true
public void disableMod4NonTransferPipes(final boolean isWithMod) {
this.isWithMod = isWithMod && this.isWithMod;
public void disableMod4NonTransferPipes(final boolean shouldTransferModFile) {
this.shouldTransferModFile = shouldTransferModFile && this.shouldTransferModFile;
refreshModFileState();
}

public boolean isLoaded() {
Expand Down Expand Up @@ -323,6 +326,7 @@ public long getExtractTime() {
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
extractTime = System.nanoTime();
try {
refreshModFileState();
tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName);
if (isWithMod) {
modFile =
Expand Down Expand Up @@ -423,7 +427,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
getSourceDatabaseNameFromDataRegion(),
resource,
tsFile,
isWithMod,
shouldTransferModFile,
isLoaded,
isGeneratedByHistoricalExtractor,
tableNames,
Expand Down Expand Up @@ -754,6 +758,7 @@ public boolean isGeneratedByHistoricalExtractor() {

private TsFileInsertionEventParser initEventParser() {
try {
refreshModFileState();
eventParser.compareAndSet(
null,
new TsFileInsertionEventParserProvider(
Expand Down Expand Up @@ -848,6 +853,7 @@ public void trackResource() {

@Override
public PipeEventResource eventResourceBuilder() {
refreshModFileState();
return new PipeTsFileInsertionEventResource(
this.isReleased,
this.referenceCount,
Expand All @@ -859,6 +865,17 @@ public PipeEventResource eventResourceBuilder() {
this.eventParser);
}

private void refreshModFileState() {
if (!shouldTransferModFile || Objects.isNull(resource)) {
isWithMod = false;
modFile = null;
return;
}

isWithMod = resource.anyModFileExists();
modFile = isWithMod ? resource.getExclusiveModFile().getFile() : null;
Comment on lines +974 to +988
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshModFileState() always resets modFile from resource.getExclusiveModFile().getFile(). After internallyIncreaseResourceReferenceCount() runs, modFile may already have been replaced with the hardlinked/copied file returned by PipeTsFileResourceManager.increaseFileReference(...); subsequent calls to getModFile()/isWithMod() will refresh again and can overwrite that pinned path. This can break reference tracking (leaking the pinned mod copy and/or decreasing the wrong path) and can make the sink transfer a different file than the one whose reference count was increased. Consider preventing refresh from overwriting modFile once the event is pinned (e.g., when referenceCount > 0), or track original-vs-pinned mod file paths separately.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1,Better always pin the copied mod file, and copy the file on checked

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this is a real issue.

PipeTsFileResourceManager.increaseFileReference(...) may replace modFile with a copied file under the pipe dir. If we keep refreshing from resource.getExclusiveModFile().getFile(), we can overwrite that pinned path, which breaks ref tracking (leaks the copied mod / decreases the wrong path) and can make the sink transfer an unpinned file.

I updated refreshModFileState() to not refresh once the event is pinned (referenceCount > 0), so modFile remains the pinned (copied) file after increaseReferenceCount(). Also added a regression test testPinnedModFilePathIsStableAfterIncreaseReferenceCount to ensure repeated getModFile()/isWithMod() calls don\x27t revert to the original mod path.

Patch is pushed to this PR branch.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Agreed. Followed this direction: once the event is pinned we keep modFile as the copied/pinned file (no overwrite back to resource.getExclusiveModFile()), and if the mod is created after pinning we now lazily pin/copy it and then keep that pinned path stable. Added a regression test to ensure pinned mod path stability after increaseReferenceCount().

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I adjusted the implementation slightly. Instead of completely skipping refresh after pinning, we now avoid overwriting an already-pinned modFile path, but still support the case where the mod file is created after pinning by lazily pinning/copying it (and then keeping that pinned path stable). This keeps ref tracking correct while covering the late-mod edge case.

}

private static class PipeTsFileInsertionEventResource extends PipeEventResource {

private final File tsFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId);

// 1. Transfer tsFile, and mod file if exists
if (pipeTsFileInsertionEvent.isWithMod()) {
if (modFile != null) {
transferFilePieces(
modFile, syncIoTConsensusV2ServiceClient, true, tCommitId, tConsensusGroupId);
transferFilePieces(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class IoTConsensusV2TsFileInsertionEventHandler
Expand Down Expand Up @@ -101,17 +100,14 @@ public IoTConsensusV2TsFileInsertionEventHandler(

tsFile = event.getTsFile();
modFile = event.getModFile();
transferMod = event.isWithMod();
transferMod = modFile != null;
currentFile = transferMod ? modFile : tsFile;

readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
readBuffer = new byte[readFileBufferSize];
position = 0;

reader =
Objects.nonNull(modFile)
? new RandomAccessFile(modFile, "r")
: new RandomAccessFile(tsFile, "r");
reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r");

isSealSignalSent = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
}

final boolean supportMod = clientManager.supportModsIfIsDataNodeReceiver();
final File modFile =
(supportMod && pipeTsFileInsertionEvent.isWithMod())
? pipeTsFileInsertionEvent.getModFile()
: null;
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new modFile snapshot still gates getModFile() behind pipeTsFileInsertionEvent.isWithMod(). With the new dynamic refresh semantics, this can miss a late-created mod file that appears after isWithMod() returns false (because getModFile() is never called). To reliably capture late-created mods and avoid double-refresh/TOCTOU, take a single snapshot via getModFile() (and then decide transfer based on modFile != null), while still honoring supportMod.

Suggested change
(supportMod && pipeTsFileInsertionEvent.isWithMod())
? pipeTsFileInsertionEvent.getModFile()
: null;
supportMod ? pipeTsFileInsertionEvent.getModFile() : null;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — updated this sink to take a single snapshot via getModFile() (when supportMod is true) and then decide by modFile != null.

This removes the isWithMod() gate/TOCTOU window and ensures late-created mods can still be observed. Also, PipeTsFileInsertionEvent.refreshModFileState() now keeps pinned paths stable while still lazily pinning a mod file created after the event is already pinned.


final PipeTransferTsFileHandler pipeTransferTsFileHandler =
new PipeTransferTsFileHandler(
this,
Expand All @@ -416,9 +422,8 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
new AtomicInteger(1),
new AtomicBoolean(false),
pipeTsFileInsertionEvent.getTsFile(),
pipeTsFileInsertionEvent.getModFile(),
pipeTsFileInsertionEvent.isWithMod()
&& clientManager.supportModsIfIsDataNodeReceiver(),
modFile,
modFile != null,
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
: null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
Expand All @@ -36,6 +37,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
Expand Down Expand Up @@ -157,6 +159,82 @@ public void testAuthCheck() throws Exception {
}
}

@Test
public void testLateCreatedModFileCanStillBeObservedAfterShallowCopy() throws Exception {
final File baseDir = new File(TestConstant.BASE_OUTPUT_PATH, "late-mod");
final File tsFile = new File(baseDir, "late-mod.tsfile");
PipeTsFileInsertionEvent originalEvent = null;
PipeTsFileInsertionEvent copiedEvent = null;
try {
Assert.assertTrue(baseDir.mkdirs() || baseDir.exists());
Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());

final TsFileResource resource = new TsFileResource(tsFile);
resource.setStatus(TsFileResourceStatus.NORMAL);

originalEvent =
new PipeTsFileInsertionEvent(
false,
"root.db",
resource,
null,
true,
false,
false,
null,
"testPipe",
1L,
null,
buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
new TablePattern(false, null, null),
null,
null,
null,
true,
Long.MIN_VALUE,
Long.MAX_VALUE);
copiedEvent =
originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
"testPipeCopy",
2L,
null,
buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
new TablePattern(false, null, null),
null,
null,
null,
true,
Long.MIN_VALUE,
Long.MAX_VALUE);

Assert.assertFalse(originalEvent.isWithMod());
Assert.assertFalse(copiedEvent.isWithMod());

resource
.getExclusiveModFile()
.write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
final File modFile = resource.getExclusiveModFile().getFile();
Assert.assertTrue(modFile.exists());

Assert.assertTrue(originalEvent.isWithMod());
Assert.assertEquals(modFile, originalEvent.getModFile());
Assert.assertTrue(copiedEvent.isWithMod());
Assert.assertEquals(modFile, copiedEvent.getModFile());

Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added test validates late-created mod visibility on the in-memory event objects, but it doesn't cover the more typical transfer path where increaseReferenceCount() is called and the mod is copied into the pipe hardlink/copy directory. Given the new dynamic refresh logic, adding assertions around increaseReferenceCount() + getModFile() would help catch regressions where the returned modFile path changes after pinning (and reference counts are decreased for a different file).

Suggested change
// Verify that after increasing the reference count (typical transfer path),
// the mod file observed by the copied event remains stable and valid.
copiedEvent.increaseReferenceCount();
final File pinnedModFile = copiedEvent.getModFile();
Assert.assertNotNull(pinnedModFile);
Assert.assertTrue(pinnedModFile.exists());
Assert.assertEquals(pinnedModFile, copiedEvent.getModFile());

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a dedicated regression test testPinnedModFilePathIsStableAfterIncreaseReferenceCount to cover the typical transfer path:

  • create the mod after event construction
  • call increaseReferenceCount() to pin/copy it into the pipe dir
  • assert the returned modFile path is stable across repeated getModFile()/isWithMod() calls (and differs from the original path)

This should catch regressions where the pinned mod path gets overwritten or ref counting becomes asymmetric.

copiedEvent.disableMod4NonTransferPipes(false);
Assert.assertFalse(copiedEvent.isWithMod());
Assert.assertNull(copiedEvent.getModFile());
} finally {
if (originalEvent != null) {
originalEvent.close();
}
if (copiedEvent != null) {
copiedEvent.close();
}
FileUtils.deleteFileOrDirectory(baseDir);
}
}

static class TestAccessControl implements AccessControl {

@Override
Expand Down
Loading