-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(iotv2): order delete materialization and tsfile snapshot #17335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
e060d1e
4262a1f
86c8fe9
b4418e4
f2c61a0
76306a2
6f96abe
ab77341
8c17a6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -404,6 +404,12 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE | |||||||||
| throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath()); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| final boolean supportMod = clientManager.supportModsIfIsDataNodeReceiver(); | ||||||||||
| final File modFile = | ||||||||||
| (supportMod && pipeTsFileInsertionEvent.isWithMod()) | ||||||||||
| ? pipeTsFileInsertionEvent.getModFile() | ||||||||||
| : null; | ||||||||||
|
||||||||||
| (supportMod && pipeTsFileInsertionEvent.isWithMod()) | |
| ? pipeTsFileInsertionEvent.getModFile() | |
| : null; | |
| supportMod ? pipeTsFileInsertionEvent.getModFile() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed — updated this sink to take a single snapshot via getModFile() (when supportMod is true) and then decide by modFile != null.
This removes the isWithMod() gate/TOCTOU window and ensures late-created mods can still be observed. Also, PipeTsFileInsertionEvent.refreshModFileState() now keeps pinned paths stable while still lazily pinning a mod file created after the event is already pinned.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |||||||||||||||||||
| import org.apache.iotdb.commons.auth.entity.PrivilegeType; | ||||||||||||||||||||
| import org.apache.iotdb.commons.conf.IoTDBConstant; | ||||||||||||||||||||
| import org.apache.iotdb.commons.exception.auth.AccessDeniedException; | ||||||||||||||||||||
| import org.apache.iotdb.commons.path.MeasurementPath; | ||||||||||||||||||||
| import org.apache.iotdb.commons.path.PartialPath; | ||||||||||||||||||||
| import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; | ||||||||||||||||||||
| import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; | ||||||||||||||||||||
|
|
@@ -36,6 +37,7 @@ | |||||||||||||||||||
| import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; | ||||||||||||||||||||
| import org.apache.iotdb.db.queryengine.plan.statement.Statement; | ||||||||||||||||||||
| import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; | ||||||||||||||||||||
| import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; | ||||||||||||||||||||
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; | ||||||||||||||||||||
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; | ||||||||||||||||||||
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; | ||||||||||||||||||||
|
|
@@ -157,6 +159,82 @@ public void testAuthCheck() throws Exception { | |||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @Test | ||||||||||||||||||||
| public void testLateCreatedModFileCanStillBeObservedAfterShallowCopy() throws Exception { | ||||||||||||||||||||
| final File baseDir = new File(TestConstant.BASE_OUTPUT_PATH, "late-mod"); | ||||||||||||||||||||
| final File tsFile = new File(baseDir, "late-mod.tsfile"); | ||||||||||||||||||||
| PipeTsFileInsertionEvent originalEvent = null; | ||||||||||||||||||||
| PipeTsFileInsertionEvent copiedEvent = null; | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| Assert.assertTrue(baseDir.mkdirs() || baseDir.exists()); | ||||||||||||||||||||
| Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| final TsFileResource resource = new TsFileResource(tsFile); | ||||||||||||||||||||
| resource.setStatus(TsFileResourceStatus.NORMAL); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| originalEvent = | ||||||||||||||||||||
| new PipeTsFileInsertionEvent( | ||||||||||||||||||||
| false, | ||||||||||||||||||||
| "root.db", | ||||||||||||||||||||
| resource, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| true, | ||||||||||||||||||||
| false, | ||||||||||||||||||||
| false, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| "testPipe", | ||||||||||||||||||||
| 1L, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), | ||||||||||||||||||||
| new TablePattern(false, null, null), | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| true, | ||||||||||||||||||||
| Long.MIN_VALUE, | ||||||||||||||||||||
| Long.MAX_VALUE); | ||||||||||||||||||||
| copiedEvent = | ||||||||||||||||||||
| originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( | ||||||||||||||||||||
| "testPipeCopy", | ||||||||||||||||||||
| 2L, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), | ||||||||||||||||||||
| new TablePattern(false, null, null), | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| null, | ||||||||||||||||||||
| true, | ||||||||||||||||||||
| Long.MIN_VALUE, | ||||||||||||||||||||
| Long.MAX_VALUE); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Assert.assertFalse(originalEvent.isWithMod()); | ||||||||||||||||||||
| Assert.assertFalse(copiedEvent.isWithMod()); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| resource | ||||||||||||||||||||
| .getExclusiveModFile() | ||||||||||||||||||||
| .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); | ||||||||||||||||||||
| final File modFile = resource.getExclusiveModFile().getFile(); | ||||||||||||||||||||
| Assert.assertTrue(modFile.exists()); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Assert.assertTrue(originalEvent.isWithMod()); | ||||||||||||||||||||
| Assert.assertEquals(modFile, originalEvent.getModFile()); | ||||||||||||||||||||
| Assert.assertTrue(copiedEvent.isWithMod()); | ||||||||||||||||||||
| Assert.assertEquals(modFile, copiedEvent.getModFile()); | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
||||||||||||||||||||
| // Verify that after increasing the reference count (typical transfer path), | |
| // the mod file observed by the copied event remains stable and valid. | |
| copiedEvent.increaseReferenceCount(); | |
| final File pinnedModFile = copiedEvent.getModFile(); | |
| Assert.assertNotNull(pinnedModFile); | |
| Assert.assertTrue(pinnedModFile.exists()); | |
| Assert.assertEquals(pinnedModFile, copiedEvent.getModFile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added a dedicated regression test testPinnedModFilePathIsStableAfterIncreaseReferenceCount to cover the typical transfer path:
- create the mod after event construction
- call
increaseReferenceCount()to pin/copy it into the pipe dir - assert the returned
modFilepath is stable across repeatedgetModFile()/isWithMod()calls (and differs from the original path)
This should catch regressions where the pinned mod path gets overwritten or ref counting becomes asymmetric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshModFileState()always resetsmodFilefromresource.getExclusiveModFile().getFile(). AfterinternallyIncreaseResourceReferenceCount()runs,modFilemay already have been replaced with the hardlinked/copied file returned byPipeTsFileResourceManager.increaseFileReference(...); subsequent calls togetModFile()/isWithMod()will refresh again and can overwrite that pinned path. This can break reference tracking (leaking the pinned mod copy and/or decreasing the wrong path) and can make the sink transfer a different file than the one whose reference count was increased. Consider preventing refresh from overwritingmodFileonce the event is pinned (e.g., whenreferenceCount > 0), or track original-vs-pinned mod file paths separately.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1,Better always pin the copied mod file, and copy the file on checked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch — this is a real issue.
PipeTsFileResourceManager.increaseFileReference(...)may replacemodFilewith a copied file under the pipe dir. If we keep refreshing fromresource.getExclusiveModFile().getFile(), we can overwrite that pinned path, which breaks ref tracking (leaks the copied mod / decreases the wrong path) and can make the sink transfer an unpinned file.I updated
refreshModFileState()to not refresh once the event is pinned (referenceCount > 0), somodFileremains the pinned (copied) file afterincreaseReferenceCount(). Also added a regression testtestPinnedModFilePathIsStableAfterIncreaseReferenceCountto ensure repeatedgetModFile()/isWithMod()calls don\x27t revert to the original mod path.Patch is pushed to this PR branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Agreed. Followed this direction: once the event is pinned we keep
modFileas the copied/pinned file (no overwrite back toresource.getExclusiveModFile()), and if the mod is created after pinning we now lazily pin/copy it and then keep that pinned path stable. Added a regression test to ensure pinned mod path stability afterincreaseReferenceCount().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I adjusted the implementation slightly. Instead of completely skipping refresh after pinning, we now avoid overwriting an already-pinned
modFilepath, but still support the case where the mod file is created after pinning by lazily pinning/copying it (and then keeping that pinned path stable). This keeps ref tracking correct while covering the late-mod edge case.