Skip to content

Commit b5199bd

Browse files
authored
Pipe: Optimized the memory occupation of pipe realtime source (#17450)
* fix * fix * fix
1 parent c1d16a4 commit b5199bd

5 files changed

Lines changed: 14 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,6 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent)
668668
addFailureEventToRetryQueue(tsFileInsertionEvent, null);
669669
}
670670
} catch (final Exception e) {
671-
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
672671
addFailureEventToRetryQueue(tsFileInsertionEvent, e);
673672
}
674673
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,7 @@ public void transfer(
153153
}
154154

155155
if (reader == null) {
156-
reader =
157-
Objects.nonNull(modFile)
158-
? new RandomAccessFile(modFile, "r")
159-
: new RandomAccessFile(tsFile, "r");
156+
reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r");
160157
}
161158

162159
this.clientManager = clientManager;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,13 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) {
427427
}
428428

429429
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
430+
// Remove any heartbeat events in front of this event to avoid OOM
431+
// Since the batch and retry queue no longer need the heartbeat event to trigger
432+
// And the progress report event can trigger the processor calculation because it's not reported
433+
// yet
434+
while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof PipeHeartbeatEvent) {
435+
pendingQueue.pollLast();
436+
}
430437
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
431438
final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast();
432439
oldEvent.bindProgressIndex(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) {
3737
public E peekLast() {
3838
return pendingDeque.peekLast();
3939
}
40+
41+
public E pollLast() {
42+
return pendingDeque.pollLast();
43+
}
4044
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,15 +798,15 @@ private TPipeTransferResp checkNonFinalFileSeal(
798798
String.format(
799799
"Failed to seal file %s, because the length of file is not correct. "
800800
+ "The original file has length %s, but receiver file has length %s.",
801-
fileName, fileLength, writingFileWriter.length()));
801+
fileName, fileLength, file.length()));
802802
PipeLogger.log(
803803
LOGGER::warn,
804804
"Receiver id = %s: Failed to seal file %s, because the length of file is not correct. "
805805
+ "The original file has length %s, but receiver file has length %s.",
806806
receiverId.get(),
807807
fileName,
808808
fileLength,
809-
writingFileWriter.length());
809+
file.length());
810810
return new TPipeTransferResp(status);
811811
}
812812

0 commit comments

Comments
 (0)