Skip to content

Commit 0011eb7

Browse files
PengznaJackieTien97
authored andcommitted
fix: pick deletion event for historical resend (#17329)
1 parent bf25167 commit 0011eb7

2 files changed

Lines changed: 21 additions & 1 deletion

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,15 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) {
236236
while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
237237
current = iterator.next();
238238
}
239-
iterator.remove();
239+
if (current.equalsInIoTConsensusV2(event)) {
240+
iterator.remove();
241+
} else {
242+
LOGGER.warn(
243+
"IoTConsensusV2-ConsensusGroup-{}: event-{} not found in transferBuffer, skip removing. queue size = {}",
244+
consensusGroupId,
245+
event,
246+
transferBuffer.size());
247+
}
240248
// update replicate progress
241249
currentReplicateProgress =
242250
Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,18 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) {
929929
skipIfNoPrivileges,
930930
false);
931931

932+
// if using IoTV2, assign a replicateIndex for this historical deletion event
933+
if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
934+
&& IoTConsensusV2Processor.isShouldReplicate(event)) {
935+
event.setReplicateIndexForIoTV2(
936+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
937+
LOGGER.debug(
938+
"[{}]Set {} for historical deletion event {}",
939+
pipeName,
940+
event.getReplicateIndexForIoTV2(),
941+
event);
942+
}
943+
932944
if (sloppyPattern || isDbNameCoveredByPattern) {
933945
event.skipParsingPattern();
934946
}

0 commit comments

Comments
 (0)