Skip to content

Commit cbfb381

Browse files
committed
Adjust to call the method that supports obtaining data region configurations at the database granularity
1 parent 7f5aa4b commit cbfb381

61 files changed

Lines changed: 1014 additions & 1045 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBTimePartitionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void testTimePartition() throws Exception {
142142
}
143143
timestatmps.forEach(
144144
t -> {
145-
long timePartitionId = TimePartitionUtils.getTimePartitionId(t);
145+
long timePartitionId = TimePartitionUtils.getTimePartitionId(t, "root.sg1");
146146
assertTrue(timePartitions.contains(timePartitionId));
147147
});
148148
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AutoCleanPartitionTablePlan.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,25 @@
3535
public class AutoCleanPartitionTablePlan extends ConfigPhysicalPlan {
3636

3737
Map<String, Long> databaseTTLMap;
38-
TTimePartitionSlot currentTimeSlot;
38+
Map<String, TTimePartitionSlot> currentTimeSlotMap;
3939

4040
public AutoCleanPartitionTablePlan() {
4141
super(ConfigPhysicalPlanType.AutoCleanPartitionTable);
4242
}
4343

4444
public AutoCleanPartitionTablePlan(
45-
Map<String, Long> databaseTTLMap, TTimePartitionSlot currentTimeSlot) {
45+
Map<String, Long> databaseTTLMap, Map<String, TTimePartitionSlot> currentTimeSlotMap) {
4646
this();
4747
this.databaseTTLMap = databaseTTLMap;
48-
this.currentTimeSlot = currentTimeSlot;
48+
this.currentTimeSlotMap = currentTimeSlotMap;
4949
}
5050

5151
public Map<String, Long> getDatabaseTTLMap() {
5252
return databaseTTLMap;
5353
}
5454

55-
public TTimePartitionSlot getCurrentTimeSlot() {
56-
return currentTimeSlot;
55+
public Map<String, TTimePartitionSlot> getCurrentTimeSlotMap() {
56+
return currentTimeSlotMap;
5757
}
5858

5959
@Override
@@ -64,7 +64,7 @@ protected void serializeImpl(DataOutputStream stream) throws IOException {
6464
BasicStructureSerDeUtil.write(entry.getKey(), stream);
6565
stream.writeLong(entry.getValue());
6666
}
67-
ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlot, stream);
67+
ThriftCommonsSerDeUtils.serializeTTimePartitionSlot(currentTimeSlotMap, stream);
6868
}
6969

7070
@Override
@@ -76,7 +76,8 @@ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
7676
long value = buffer.getLong();
7777
databaseTTLMap.put(key, value);
7878
}
79-
currentTimeSlot = ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
79+
currentTimeSlotMap =
80+
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlotMap(buffer, new TreeMap<>());
8081
}
8182

8283
@Override
@@ -89,11 +90,11 @@ public boolean equals(Object o) {
8990
}
9091
AutoCleanPartitionTablePlan that = (AutoCleanPartitionTablePlan) o;
9192
return Objects.equals(databaseTTLMap, that.databaseTTLMap)
92-
&& Objects.equals(currentTimeSlot, that.currentTimeSlot);
93+
&& Objects.equals(currentTimeSlotMap, that.currentTimeSlotMap);
9394
}
9495

9596
@Override
9697
public int hashCode() {
97-
return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlot);
98+
return Objects.hash(super.hashCode(), databaseTTLMap, currentTimeSlotMap);
9899
}
99100
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,8 @@ public TSStatus autoCleanPartitionTable(AutoCleanPartitionTablePlan plan) {
521521
if (isDatabaseExisted(database) && 0 < ttl && ttl < Long.MAX_VALUE) {
522522
databasePartitionTables
523523
.get(database)
524-
.autoCleanPartitionTable(ttl, plan.getCurrentTimeSlotMap());
524+
.autoCleanPartitionTable(
525+
ttl, plan.getCurrentTimeSlotMap().getOrDefault(database, null));
525526
}
526527
});
527528
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ private static DataPartitionQueryParam getTreeDataPartitionQueryParam(
215215
InsertTabletStatement statement, MPPQueryContext context) {
216216
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
217217
dataPartitionQueryParam.setDeviceID(statement.getDevicePath().getIDeviceIDAsFullDevice());
218-
dataPartitionQueryParam.setTimePartitionSlotList(statement.getTimePartitionSlots());
218+
dataPartitionQueryParam.setTimePartitionSlotList(
219+
statement.getTimePartitionSlots(getDatabaseName(statement, context)));
219220
dataPartitionQueryParam.setDatabaseName(getDatabaseName(statement, context));
220221
return dataPartitionQueryParam;
221222
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3428,7 +3428,7 @@ public SettableFuture<ConfigTaskResult> getTimeSlotList(
34283428
} catch (final Exception e) {
34293429
future.setException(e);
34303430
}
3431-
GetTimeSlotListTask.buildTSBlock(resp, future);
3431+
GetTimeSlotListTask.buildTSBlock(resp, future, getTimeSlotListStatement.getDatabase());
34323432
return future;
34333433
}
34343434

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/GetTimeSlotListTask.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTask
5858
return configTaskExecutor.getTimeSlotList(getTimeSlotListStatement);
5959
}
6060

61-
public static void buildTSBlockRow(TsBlockBuilder builder, TTimePartitionSlot timePartitionSlot) {
61+
public static void buildTSBlockRow(
62+
TsBlockBuilder builder, TTimePartitionSlot timePartitionSlot, String database) {
6263
builder.getTimeColumnBuilder().writeLong(0L);
6364
builder
6465
.getColumnBuilder(0)
65-
.writeLong(TimePartitionUtils.getTimePartitionId(timePartitionSlot.getStartTime()));
66+
.writeLong(
67+
TimePartitionUtils.getTimePartitionId(timePartitionSlot.getStartTime(), database));
6668
builder
6769
.getColumnBuilder(1)
6870
.writeBinary(
@@ -73,14 +75,16 @@ public static void buildTSBlockRow(TsBlockBuilder builder, TTimePartitionSlot ti
7375
}
7476

7577
public static void buildTSBlock(
76-
TGetTimeSlotListResp getTimeSlotListResp, SettableFuture<ConfigTaskResult> future) {
78+
TGetTimeSlotListResp getTimeSlotListResp,
79+
SettableFuture<ConfigTaskResult> future,
80+
String database) {
7781
List<TSDataType> outputDataTypes =
7882
ColumnHeaderConstant.getTimeSlotListColumnHeaders.stream()
7983
.map(ColumnHeader::getColumnType)
8084
.collect(Collectors.toList());
8185
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
8286

83-
getTimeSlotListResp.getTimeSlotList().forEach(e -> buildTSBlockRow(builder, e));
87+
getTimeSlotListResp.getTimeSlotList().forEach(e -> buildTSBlockRow(builder, e, database));
8488

8589
DatasetHeader datasetHeader = DatasetHeaderFactory.getGetTimeSlotListHeader();
8690
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,7 +1236,8 @@ private List<PlanNode> splitInnerTimeJoinNode(
12361236
InnerTimeJoinNode innerTimeJoinNode = (InnerTimeJoinNode) node.clone();
12371237
innerTimeJoinNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
12381238

1239-
List<Long> timePartitionIds = convertToTimePartitionIds(oneRegion);
1239+
List<Long> timePartitionIds =
1240+
convertToTimePartitionIds(oneRegion, analysis.getDatabaseName());
12401241
innerTimeJoinNode.setTimePartitions(timePartitionIds);
12411242

12421243
// region group id -> parent InnerTimeJoinNode
@@ -1285,10 +1286,11 @@ private List<PlanNode> splitInnerTimeJoinNode(
12851286
return subInnerJoinNode;
12861287
}
12871288

1288-
private List<Long> convertToTimePartitionIds(List<TTimePartitionSlot> timePartitionSlotList) {
1289+
private List<Long> convertToTimePartitionIds(
1290+
List<TTimePartitionSlot> timePartitionSlotList, String database) {
12891291
List<Long> res = new ArrayList<>(timePartitionSlotList.size());
12901292
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
1291-
res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime));
1293+
res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime, database));
12921294
}
12931295
return res;
12941296
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,8 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
11931193
}
11941194

11951195
// init map
1196-
long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
1196+
long timePartitionId =
1197+
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime(), databaseName);
11971198
initFlushTimeMap(timePartitionId);
11981199

11991200
boolean isSequence =
@@ -1257,15 +1258,15 @@ private void split(
12571258
int before = loc;
12581259
long beforeTime = insertTabletNode.getTimes()[before];
12591260
// before time partition
1260-
long beforeTimePartition = TimePartitionUtils.getTimePartitionId(beforeTime);
1261+
long beforeTimePartition = TimePartitionUtils.getTimePartitionId(beforeTime, databaseName);
12611262
// init flush time map
12621263
initFlushTimeMap(beforeTimePartition);
12631264

12641265
// if is sequence
12651266
boolean isSequence = false;
12661267
while (loc < endOffset) {
12671268
long time = insertTabletNode.getTimes()[loc];
1268-
final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
1269+
final long timePartitionId = TimePartitionUtils.getTimePartitionId(time, databaseName);
12691270

12701271
long lastFlushTime;
12711272
// judge if we should insert sequence
@@ -1779,7 +1780,8 @@ private TsFileProcessor insertRowsWithTypeConsistencyCheck(
17791780
tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
17801781
} catch (DataTypeInconsistentException e) {
17811782
InsertRowNode firstRow = subInsertRowsNode.getInsertRowNodeList().get(0);
1782-
long timePartitionId = TimePartitionUtils.getTimePartitionId(firstRow.getTime());
1783+
long timePartitionId =
1784+
TimePartitionUtils.getTimePartitionId(firstRow.getTime(), databaseName);
17831785
// flush both MemTables so that the new type can be inserted into a new MemTable
17841786
TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId);
17851787
if (workSequenceProcessor != null) {
@@ -4472,7 +4474,8 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
44724474
continue;
44734475
}
44744476
// init map
4475-
long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
4477+
long timePartitionId =
4478+
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime(), databaseName);
44764479

44774480
if (config.isEnableSeparateData()
44784481
&& !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
@@ -4596,7 +4599,8 @@ public void insert(InsertRowsNode insertRowsNode)
45964599
continue;
45974600
}
45984601
// init map
4599-
timePartitionIds[i] = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
4602+
timePartitionIds[i] =
4603+
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime(), databaseName);
46004604

46014605
if (config.isEnableSeparateData()
46024606
&& !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i], true)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,6 @@ public TsFileManager(String storageGroupName, String dataRegionId, String dataRe
6565
this.dataRegionId = dataRegionId;
6666
}
6767

68-
// @todo
69-
public List<TsFileResource> getTsFileList(boolean sequence) {
70-
return getTsFileList(sequence, null, null, "");
71-
}
72-
7368
public List<TsFileResource> getTsFileList(boolean sequence, String database) {
7469
return getTsFileList(sequence, null, null, database);
7570
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,12 +1124,7 @@ public long getTimePartition() {
11241124
* @throws PartitionViolationException if the data of the file spans partitions or it is empty
11251125
*/
11261126
public long getTimePartitionWithCheck() throws PartitionViolationException {
1127-
return timeIndex.getTimePartitionWithCheck(file.toString());
1128-
}
1129-
1130-
/** Check whether the tsFile spans multiple time partitions. */
1131-
public boolean isSpanMultiTimePartitions() {
1132-
return timeIndex.isSpanMultiTimePartitions();
1127+
return timeIndex.getTimePartitionWithCheck(file.toString(), getDatabaseName());
11331128
}
11341129

11351130
public void setExclusiveModFile(ModificationFile exclusiveModFile) {

0 commit comments

Comments
 (0)