Skip to content

Commit 1fdd4b3

Browse files
authored
Pipe: Refactored the dataRegionId data type to integer (#17214)
1 parent 9041e60 commit 1fdd4b3

22 files changed

Lines changed: 174 additions & 192 deletions

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
6262
// to trigger the general event transfer function, causing potentially such as
6363
// the random delay of the batch transmission. Therefore, here we inject cron events
6464
// when no event can be pulled.
65-
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
66-
new PipeHeartbeatEvent("cron", false);
65+
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent(-1, false);
6766

6867
public PipeSinkSubtask(
6968
final String taskID,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource {
5858
private volatile Exception cause;
5959

6060
public DeletionResource(
61-
AbstractDeleteDataNode deleteDataNode,
62-
Consumer<DeletionResource> removeHook,
63-
String regionId) {
61+
AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, int regionId) {
6462
this.deleteDataNode = deleteDataNode;
6563
this.removeHook = removeHook;
6664
this.currentStatus = Status.RUNNING;
6765
this.consensusGroupId =
68-
ConsensusGroupId.Factory.create(
69-
TConsensusGroupType.DataRegion.getValue(), Integer.parseInt(regionId));
66+
ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), regionId);
7067
this.pipeTaskReferenceCount =
7168
new AtomicInteger(
7269
DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1);
@@ -151,7 +148,7 @@ public ByteBuffer serialize() {
151148
}
152149

153150
public static DeletionResource deserialize(
154-
final ByteBuffer buffer, final String regionId, final Consumer<DeletionResource> removeHook)
151+
final ByteBuffer buffer, final int regionId, final Consumer<DeletionResource> removeHook)
155152
throws IOException {
156153
AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
157154
return new DeletionResource(node, removeHook, regionId);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable {
6161
String.format(
6262
"^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
6363
REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
64-
private final String dataRegionId;
64+
private final int dataRegionId;
6565
private final DeletionBuffer deletionBuffer;
6666
private final File storageDir;
6767
private final Map<AbstractDeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
@@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable {
7070
private final Condition recoveryReadyCondition = recoverLock.newCondition();
7171
private volatile boolean hasCompletedRecovery = false;
7272

73-
private DeletionResourceManager(String dataRegionId) throws IOException {
73+
private DeletionResourceManager(int dataRegionId) throws IOException {
7474
this.dataRegionId = dataRegionId;
7575
this.storageDir =
7676
new File(
@@ -269,23 +269,23 @@ private boolean isFileProgressCoveredByGivenProgress(
269269

270270
//////////////////////////// singleton ////////////////////////////
271271
private static class DeletionResourceManagerHolder {
272-
private static Map<String, DeletionResourceManager> CONSENSU_GROUP_ID_2_INSTANCE_MAP;
272+
private static Map<Integer, DeletionResourceManager> CONSENSUS_GROUP_ID_2_INSTANCE_MAP;
273273

274274
private DeletionResourceManagerHolder() {}
275275

276276
public static void build() {
277-
if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
278-
CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
277+
if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
278+
CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
279279
}
280280
}
281281
}
282282

283-
public static DeletionResourceManager getInstance(String groupId) {
283+
public static DeletionResourceManager getInstance(int groupId) {
284284
// If consensusImpl is not PipeConsensus.
285-
if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
285+
if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
286286
return null;
287287
}
288-
return DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
288+
return DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
289289
groupId,
290290
key -> {
291291
try {
@@ -305,10 +305,10 @@ public static void build() {
305305
}
306306

307307
public static void exit() {
308-
if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
308+
if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
309309
return;
310310
}
311-
DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.forEach(
311+
DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.forEach(
312312
(groupId, resourceManager) -> {
313313
resourceManager.close();
314314
});

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer {
7575
? 0
7676
: (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 : -1));
7777
// Data region id
78-
private final String dataRegionId;
78+
private final int dataRegionId;
7979
// directory to store .deletion files
8080
private final String baseDirectory;
8181
// single thread to serialize WALEntry to workingBuffer
@@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer {
9999
// maxProgressIndex of each batch increases in the same order as the physical time.
100100
private ProgressIndex maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;
101101

102-
public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
102+
public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) {
103103
this.dataRegionId = dataRegionId;
104104
this.baseDirectory = baseDirectory;
105105
allocateBuffers();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ public class DeletionReader implements Closeable {
4040
private static final Logger LOGGER = LoggerFactory.getLogger(DeletionReader.class);
4141
private static final int MAGIC_STRING_BYTES_SIZE =
4242
DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length;
43-
private final String regionId;
43+
private final int regionId;
4444
private final Consumer<DeletionResource> removeHook;
4545
private final File logFile;
4646
private final FileInputStream fileInputStream;
4747
private final FileChannel fileChannel;
4848

49-
public DeletionReader(File logFile, String regionId, Consumer<DeletionResource> removeHook)
49+
public DeletionReader(File logFile, int regionId, Consumer<DeletionResource> removeHook)
5050
throws IOException {
5151
this.logFile = logFile;
5252
this.regionId = regionId;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
4141

4242
private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
4343

44-
private final String dataRegionId;
44+
private final int dataRegionId;
4545

4646
private long timePublished;
4747
private long timeAssigned;
@@ -52,17 +52,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
5252
// The disruptor is usually nearly empty.
5353
private int disruptorSize;
5454

55-
private int extractorQueueTabletSize;
56-
private int extractorQueueTsFileSize;
57-
private int extractorQueueSize;
55+
private int sourceQueueTabletSize;
56+
private int sourceQueueTsFileSize;
57+
private int sourceQueueSize;
5858

59-
private int connectorQueueTabletSize;
60-
private int connectorQueueTsFileSize;
61-
private int connectorQueueSize;
59+
private int sinkQueueTabletSize;
60+
private int sinkQueueTsFileSize;
61+
private int sinkQueueSize;
6262

6363
private final boolean shouldPrintMessage;
6464

65-
public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) {
65+
public PipeHeartbeatEvent(final int dataRegionId, final boolean shouldPrintMessage) {
6666
super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE);
6767
this.dataRegionId = dataRegionId;
6868
this.shouldPrintMessage = shouldPrintMessage;
@@ -72,7 +72,7 @@ public PipeHeartbeatEvent(
7272
final String pipeName,
7373
final long creationTime,
7474
final PipeTaskMeta pipeTaskMeta,
75-
final String dataRegionId,
75+
final int dataRegionId,
7676
final long timePublished,
7777
final boolean shouldPrintMessage) {
7878
super(
@@ -104,7 +104,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
104104
@Override
105105
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
106106
// PipeName == null indicates that the event is the raw event at disruptor,
107-
// not the event copied and passed to the extractor
107+
// not the event copied and passed to the source
108108
if (Objects.nonNull(pipeName)) {
109109
PipeDataNodeSinglePipeMetrics.getInstance()
110110
.decreaseHeartbeatEventCount(pipeName, creationTime);
@@ -208,17 +208,17 @@ public void recordDisruptorSize(final RingBuffer ringBuffer) {
208208

209209
public void recordExtractorQueueSize(final UnboundedBlockingPendingQueue<Event> pendingQueue) {
210210
if (shouldPrintMessage) {
211-
extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
212-
extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
213-
extractorQueueSize = pendingQueue.size();
211+
sourceQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
212+
sourceQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
213+
sourceQueueSize = pendingQueue.size();
214214
}
215215
}
216216

217217
public void recordConnectorQueueSize(final UnboundedBlockingPendingQueue<Event> pendingQueue) {
218218
if (shouldPrintMessage) {
219-
connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
220-
connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
221-
connectorQueueSize = pendingQueue.size();
219+
sinkQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
220+
sinkQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
221+
sinkQueueSize = pendingQueue.size();
222222
}
223223
}
224224

@@ -259,19 +259,19 @@ public String toString() {
259259

260260
final String disruptorSizeMessage = Integer.toString(disruptorSize);
261261

262-
final String extractorQueueTabletSizeMessage =
263-
timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : unknownMessage;
264-
final String extractorQueueTsFileSizeMessage =
265-
timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : unknownMessage;
266-
final String extractorQueueSizeMessage =
267-
timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage;
262+
final String sourceQueueTabletSizeMessage =
263+
timeAssigned != 0 ? Integer.toString(sourceQueueTabletSize) : unknownMessage;
264+
final String sourceQueueTsFileSizeMessage =
265+
timeAssigned != 0 ? Integer.toString(sourceQueueTsFileSize) : unknownMessage;
266+
final String sourceQueueSizeMessage =
267+
timeAssigned != 0 ? Integer.toString(sourceQueueSize) : unknownMessage;
268268

269-
final String connectorQueueTabletSizeMessage =
270-
timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : unknownMessage;
271-
final String connectorQueueTsFileSizeMessage =
272-
timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : unknownMessage;
273-
final String connectorQueueSizeMessage =
274-
timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage;
269+
final String sinkQueueTabletSizeMessage =
270+
timeProcessed != 0 ? Integer.toString(sinkQueueTabletSize) : unknownMessage;
271+
final String sinkQueueTsFileSizeMessage =
272+
timeProcessed != 0 ? Integer.toString(sinkQueueTsFileSize) : unknownMessage;
273+
final String sinkQueueSizeMessage =
274+
timeProcessed != 0 ? Integer.toString(sinkQueueSize) : unknownMessage;
275275

276276
return "PipeHeartbeatEvent{"
277277
+ "pipeName='"
@@ -290,18 +290,18 @@ public String toString() {
290290
+ totalTimeMessage
291291
+ ", disruptorSize="
292292
+ disruptorSizeMessage
293-
+ ", extractorQueueTabletSize="
294-
+ extractorQueueTabletSizeMessage
295-
+ ", extractorQueueTsFileSize="
296-
+ extractorQueueTsFileSizeMessage
297-
+ ", extractorQueueSize="
298-
+ extractorQueueSizeMessage
299-
+ ", connectorQueueTabletSize="
300-
+ connectorQueueTabletSizeMessage
301-
+ ", connectorQueueTsFileSize="
302-
+ connectorQueueTsFileSizeMessage
303-
+ ", connectorQueueSize="
304-
+ connectorQueueSizeMessage
293+
+ ", sourceQueueTabletSize="
294+
+ sourceQueueTabletSizeMessage
295+
+ ", sourceQueueTsFileSize="
296+
+ sourceQueueTsFileSizeMessage
297+
+ ", sourceQueueSize="
298+
+ sourceQueueSizeMessage
299+
+ ", sinkQueueTabletSize="
300+
+ sinkQueueTabletSizeMessage
301+
+ ", sinkQueueTsFileSize="
302+
+ sinkQueueTsFileSizeMessage
303+
+ ", sinkQueueSize="
304+
+ sinkQueueSizeMessage
305305
+ "}";
306306
}
307307
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent {
4040

41-
private final String dataRegionId;
41+
private final int dataRegionId;
4242
private final Set<String> originFilePaths;
4343
private final List<Long> commitIds;
4444

@@ -70,7 +70,7 @@ public PipeCompactedTsFileInsertionEvent(
7070
anyOfOriginalEvents.getStartTime(),
7171
anyOfOriginalEvents.getEndTime());
7272

73-
this.dataRegionId = String.valueOf(committerKey.getRegionId());
73+
this.dataRegionId = committerKey.getRegionId();
7474
this.originFilePaths =
7575
originalEvents.stream()
7676
.map(PipeTsFileInsertionEvent::getTsFile)

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ public ProgressIndex forceGetProgressIndex() {
400400
public void eliminateProgressIndex() {
401401
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
402402
PipeTsFileEpochProgressIndexKeeper.getInstance()
403-
.eliminateProgressIndex(resource.getDataRegionId(), pipeName, resource.getTsFilePath());
403+
.eliminateProgressIndex(
404+
Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath());
404405
}
405406
}
406407

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
5757
}
5858

5959
public static PipeRealtimeEvent createRealtimeEvent(
60-
final String dataRegionId, final boolean shouldPrintMessage) {
60+
final int dataRegionId, final boolean shouldPrintMessage) {
6161
return new PipeRealtimeEvent(
6262
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
6363
}

0 commit comments

Comments
 (0)