Skip to content

Commit b224633

Browse files
authored
Fixed TsFileProcessorTest may check the result before the TsFile is closed (#17232)
1 parent ea3cff5 commit b224633

2 files changed

Lines changed: 1 addition & 206 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,53 +1336,6 @@ public Future<?> asyncClose() {
13361336
return CompletableFuture.completedFuture(null);
13371337
}
13381338

1339-
/**
1340-
* TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup Tips: I am
1341-
* trying to solve this issue by checking whether the table exist before wait()
1342-
*/
1343-
@TestOnly
1344-
public void syncFlush() throws IOException {
1345-
IMemTable tmpMemTable;
1346-
flushQueryLock.writeLock().lock();
1347-
logFlushQueryWriteLocked();
1348-
try {
1349-
tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
1350-
if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
1351-
logger.debug(
1352-
"{}: {} add a signal memtable into flushing memtable list when sync flush",
1353-
dataRegionName,
1354-
tsFileResource.getTsFile().getName());
1355-
}
1356-
addAMemtableIntoFlushingList(tmpMemTable);
1357-
} finally {
1358-
flushQueryLock.writeLock().unlock();
1359-
logFlushQueryWriteUnlocked();
1360-
}
1361-
1362-
synchronized (flushingMemTables) {
1363-
try {
1364-
long startWait = System.currentTimeMillis();
1365-
while (flushingMemTables.contains(tmpMemTable)) {
1366-
flushingMemTables.wait(1000);
1367-
1368-
if ((System.currentTimeMillis() - startWait) > 60_000) {
1369-
logger.warn(
1370-
"has waited for synced flushing a memtable in {} for 60 seconds.",
1371-
this.tsFileResource.getTsFile().getAbsolutePath());
1372-
startWait = System.currentTimeMillis();
1373-
}
1374-
}
1375-
} catch (InterruptedException e) {
1376-
logger.error(
1377-
"{}: {} wait flush finished meets error",
1378-
dataRegionName,
1379-
tsFileResource.getTsFile().getName(),
1380-
e);
1381-
Thread.currentThread().interrupt();
1382-
}
1383-
}
1384-
}
1385-
13861339
/** Put the working memtable into flushing list and set the working memtable to null */
13871340
public void asyncFlush() {
13881341
flushQueryLock.writeLock().lock();
@@ -1393,7 +1346,7 @@ public void asyncFlush() {
13931346
}
13941347
logger.info(
13951348
"Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath());
1396-
addAMemtableIntoFlushingList(workMemTable);
1349+
closeFuture = addAMemtableIntoFlushingList(workMemTable);
13971350
} catch (Exception e) {
13981351
logger.error(
13991352
"{}: {} add a memtable into flushing list failed",

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java

Lines changed: 0 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
import org.apache.tsfile.enums.TSDataType;
4747
import org.apache.tsfile.external.commons.io.FileUtils;
48-
import org.apache.tsfile.file.metadata.ChunkMetadata;
4948
import org.apache.tsfile.file.metadata.IDeviceID;
5049
import org.apache.tsfile.file.metadata.enums.CompressionType;
5150
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -60,7 +59,6 @@
6059
import org.apache.tsfile.write.record.TSRecord;
6160
import org.apache.tsfile.write.record.datapoint.DataPoint;
6261
import org.apache.tsfile.write.schema.MeasurementSchema;
63-
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
6462
import org.junit.After;
6563
import org.junit.Assert;
6664
import org.junit.Before;
@@ -183,12 +181,6 @@ public void testWriteAndFlush()
183181
}
184182

185183
// flush synchronously
186-
processor.syncFlush();
187-
188-
tsfileResourcesForQuery.clear();
189-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
190-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
191-
assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
192184
processor.syncClose();
193185

194186
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -258,12 +250,6 @@ public void testFlushMultiChunks()
258250
}
259251

260252
// flush synchronously
261-
processor.syncFlush();
262-
263-
tsfileResourcesForQuery.clear();
264-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
265-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
266-
assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
267253
processor.syncClose();
268254

269255
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -337,12 +323,6 @@ public void testFlushMultiBinaryChunks()
337323
}
338324

339325
// flush synchronously
340-
processor.syncFlush();
341-
342-
tsfileResourcesForQuery.clear();
343-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
344-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
345-
assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
346326
processor.syncClose();
347327

348328
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -423,12 +403,6 @@ public void testFlushMultiAlignedChunks()
423403
}
424404

425405
// flush synchronously
426-
processor.syncFlush();
427-
428-
tsfileResourcesForQuery.clear();
429-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
430-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
431-
assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
432406
processor.syncClose();
433407

434408
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -512,12 +486,6 @@ public void testFlushMultiAlignedBinaryChunks()
512486
}
513487

514488
// flush synchronously
515-
processor.syncFlush();
516-
517-
tsfileResourcesForQuery.clear();
518-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
519-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
520-
assertEquals(3, tsfileResourcesForQuery.get(0).getChunkMetadataList(fullPath).size());
521489
processor.syncClose();
522490

523491
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
@@ -537,132 +505,6 @@ public void testFlushMultiAlignedBinaryChunks()
537505
}
538506
}
539507

540-
@Test
541-
public void testWriteAndRestoreMetadata()
542-
throws IOException, WriteProcessException, MetadataException, ExecutionException {
543-
logger.info("testWriteAndRestoreMetadata begin..");
544-
processor =
545-
new TsFileProcessor(
546-
storageGroup,
547-
SystemFileFactory.INSTANCE.getFile(filePath),
548-
sgInfo,
549-
this::closeTsFileProcessor,
550-
(tsFileProcessor, updateMap, systemFlushTime) -> {},
551-
true);
552-
553-
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
554-
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
555-
this.sgInfo.initTsFileProcessorInfo(processor);
556-
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
557-
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
558-
NonAlignedFullPath fullPath =
559-
new NonAlignedFullPath(
560-
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
561-
new MeasurementSchema(
562-
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
563-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
564-
assertTrue(tsfileResourcesForQuery.isEmpty());
565-
566-
for (int i = 1; i <= 100; i++) {
567-
TSRecord record = new TSRecord(deviceId, i);
568-
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
569-
processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
570-
}
571-
572-
// query data in memory
573-
tsfileResourcesForQuery.clear();
574-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
575-
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
576-
int num = 1;
577-
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
578-
for (ReadOnlyMemChunk chunk : memChunks) {
579-
IPointReader iterator = chunk.getPointReader();
580-
for (; num <= 100; num++) {
581-
iterator.hasNextTimeValuePair();
582-
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
583-
assertEquals(num, timeValuePair.getTimestamp());
584-
assertEquals(num, timeValuePair.getValue().getInt());
585-
}
586-
}
587-
logger.info("syncFlush..");
588-
// flush synchronously
589-
processor.syncFlush();
590-
591-
tsfileResourcesForQuery.clear();
592-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
593-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
594-
595-
RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
596-
Map<IDeviceID, List<ChunkMetadata>> chunkMetaDataListInChunkGroups =
597-
tsFileIOWriter.getDeviceChunkMetadataMap();
598-
RestorableTsFileIOWriter restorableTsFileIOWriter =
599-
new RestorableTsFileIOWriter(SystemFileFactory.INSTANCE.getFile(filePath));
600-
Map<IDeviceID, List<ChunkMetadata>> restoredChunkMetaDataListInChunkGroups =
601-
restorableTsFileIOWriter.getDeviceChunkMetadataMap();
602-
assertEquals(
603-
chunkMetaDataListInChunkGroups.size(), restoredChunkMetaDataListInChunkGroups.size());
604-
for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry1 :
605-
chunkMetaDataListInChunkGroups.entrySet()) {
606-
for (Map.Entry<IDeviceID, List<ChunkMetadata>> entry2 :
607-
restoredChunkMetaDataListInChunkGroups.entrySet()) {
608-
assertEquals(entry1.getKey(), entry2.getKey());
609-
assertEquals(entry1.getValue().size(), entry2.getValue().size());
610-
for (int i = 0; i < entry1.getValue().size(); i++) {
611-
ChunkMetadata chunkMetaData = entry1.getValue().get(i);
612-
chunkMetaData.setVersion(0);
613-
ChunkMetadata chunkMetadataRestore = entry2.getValue().get(i);
614-
chunkMetadataRestore.setVersion(0);
615-
}
616-
}
617-
}
618-
restorableTsFileIOWriter.close();
619-
logger.info("syncClose..");
620-
processor.syncClose();
621-
// we need to close the tsfile writer first and then reopen it.
622-
}
623-
624-
@Test
625-
public void testMultiFlush()
626-
throws IOException, WriteProcessException, MetadataException, ExecutionException {
627-
processor =
628-
new TsFileProcessor(
629-
storageGroup,
630-
SystemFileFactory.INSTANCE.getFile(filePath),
631-
sgInfo,
632-
this::closeTsFileProcessor,
633-
(tsFileProcessor, updateMap, systemFlushTime) -> {},
634-
true);
635-
636-
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
637-
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
638-
this.sgInfo.initTsFileProcessorInfo(processor);
639-
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
640-
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
641-
NonAlignedFullPath fullPath =
642-
new NonAlignedFullPath(
643-
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
644-
new MeasurementSchema(
645-
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
646-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
647-
assertTrue(tsfileResourcesForQuery.isEmpty());
648-
649-
for (int flushId = 0; flushId < 10; flushId++) {
650-
for (int i = 1; i <= 10; i++) {
651-
TSRecord record = new TSRecord(deviceId, i);
652-
record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i)));
653-
processor.insert(buildInsertRowNodeByTSRecord(record), new long[5]);
654-
}
655-
processor.asyncFlush();
656-
}
657-
processor.syncFlush();
658-
659-
tsfileResourcesForQuery.clear();
660-
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
661-
assertFalse(tsfileResourcesForQuery.isEmpty());
662-
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
663-
processor.syncClose();
664-
}
665-
666508
@Test
667509
public void alignedTvListRamCostTest()
668510
throws MetadataException, WriteProcessException, IOException {

0 commit comments

Comments
 (0)