1919
2020package org .apache .iotdb .db .queryengine .execution .exchange ;
2121
22+ import org .apache .iotdb .commons .utils .TestOnly ;
2223import org .apache .iotdb .db .conf .IoTDBDescriptor ;
2324import org .apache .iotdb .db .queryengine .common .FragmentInstanceId ;
2425import org .apache .iotdb .db .queryengine .execution .exchange .sink .LocalSinkChannel ;
2526import org .apache .iotdb .db .queryengine .execution .exchange .source .LocalSourceHandle ;
2627import org .apache .iotdb .db .queryengine .execution .memory .LocalMemoryManager ;
28+ import org .apache .iotdb .db .queryengine .execution .memory .MemoryPool .MemoryReservationResult ;
2729import org .apache .iotdb .db .utils .CommonUtils ;
2830import org .apache .iotdb .mpp .rpc .thrift .TFragmentInstanceId ;
2931
@@ -63,7 +65,7 @@ public class SharedTsBlockQueue {
6365
6466 private long bufferRetainedSizeInBytes = 0L ;
6567
66- private final Queue <TsBlock > queue = new LinkedList <>();
68+ private final Queue <Pair < TsBlock , Long > > queue = new LinkedList <>();
6769
6870 private SettableFuture <Void > blocked = SettableFuture .create ();
6971
@@ -83,17 +85,28 @@ public class SharedTsBlockQueue {
8385
8486 private long maxBytesCanReserve =
8587 IoTDBDescriptor .getInstance ().getMemoryConfig ().getMaxBytesPerFragmentInstance ();
88+ private final boolean isHighestPriority ;
8689
8790 private volatile Throwable abortedCause = null ;
8891
8992 // used for SharedTsBlockQueue listener
9093 private final ExecutorService executorService ;
9194
95+ @ TestOnly
9296 public SharedTsBlockQueue (
9397 TFragmentInstanceId fragmentInstanceId ,
9498 String planNodeId ,
9599 LocalMemoryManager localMemoryManager ,
96100 ExecutorService executorService ) {
101+ this (fragmentInstanceId , planNodeId , localMemoryManager , executorService , false );
102+ }
103+
104+ public SharedTsBlockQueue (
105+ TFragmentInstanceId fragmentInstanceId ,
106+ String planNodeId ,
107+ LocalMemoryManager localMemoryManager ,
108+ ExecutorService executorService ,
109+ boolean isHighestPriority ) {
97110 this .localFragmentInstanceId =
98111 Validate .notNull (fragmentInstanceId , "fragment instance ID cannot be null" );
99112 this .fullFragmentInstanceId =
@@ -102,6 +115,7 @@ public SharedTsBlockQueue(
102115 this .localMemoryManager =
103116 Validate .notNull (localMemoryManager , "local memory manager cannot be null" );
104117 this .executorService = Validate .notNull (executorService , "ExecutorService can not be null." );
118+ this .isHighestPriority = isHighestPriority ;
105119 }
106120
107121 public boolean hasNoMoreTsBlocks () {
@@ -196,15 +210,18 @@ public TsBlock remove() {
196210 }
197211 throw new IllegalStateException ("queue has been destroyed" );
198212 }
199- TsBlock tsBlock = queue .remove ();
200- localMemoryManager
201- .getQueryPool ()
202- .free (
203- localFragmentInstanceId .getQueryId (),
204- fullFragmentInstanceId ,
205- localPlanNodeId ,
206- tsBlock .getSizeInBytes ());
207- bufferRetainedSizeInBytes -= tsBlock .getSizeInBytes ();
213+ Pair <TsBlock , Long > tsBlockWithReservedBytes = queue .remove ();
214+ long reservedBytes = tsBlockWithReservedBytes .right ;
215+ if (reservedBytes > 0 ) {
216+ localMemoryManager
217+ .getQueryPool ()
218+ .free (
219+ localFragmentInstanceId .getQueryId (),
220+ fullFragmentInstanceId ,
221+ localPlanNodeId ,
222+ reservedBytes );
223+ bufferRetainedSizeInBytes -= reservedBytes ;
224+ }
208225 // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event
209226 // to
210227 // corresponding LocalSinkChannel.
@@ -214,7 +231,7 @@ public TsBlock remove() {
214231 if (blocked .isDone () && queue .isEmpty () && !noMoreTsBlocks ) {
215232 blocked = SettableFuture .create ();
216233 }
217- return tsBlock ;
234+ return tsBlockWithReservedBytes . left ;
218235 }
219236
220237 /**
@@ -240,20 +257,22 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
240257 localFragmentInstanceId .queryId , fullFragmentInstanceId , localPlanNodeId );
241258 alreadyRegistered = true ;
242259 }
243- Pair < ListenableFuture < Void >, Boolean > pair =
260+ MemoryReservationResult reserveResult =
244261 localMemoryManager
245262 .getQueryPool ()
246- .reserve (
263+ .reserveWithPriority (
247264 localFragmentInstanceId .getQueryId (),
248265 fullFragmentInstanceId ,
249266 localPlanNodeId ,
250267 tsBlock .getSizeInBytes (),
251- maxBytesCanReserve );
252- blockedOnMemory = pair .left ;
253- bufferRetainedSizeInBytes += tsBlock .getSizeInBytes ();
268+ maxBytesCanReserve ,
269+ isHighestPriority );
270+ blockedOnMemory = reserveResult .getFuture ();
271+ long reservedBytes = reserveResult .getReservedBytes ();
272+ bufferRetainedSizeInBytes += reservedBytes ;
254273
255274 // reserve memory failed, we should wait until there is enough memory
256- if (!Boolean . TRUE . equals ( pair . right )) {
275+ if (!reserveResult . isReserveSuccess ( )) {
257276 SettableFuture <Void > channelBlocked = SettableFuture .create ();
258277 blockedOnMemory .addListener (
259278 () -> {
@@ -268,7 +287,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
268287 channelBlocked .set (null );
269288 return ;
270289 }
271- queue .add (tsBlock );
290+ queue .add (new Pair <>( tsBlock , reservedBytes ) );
272291 if (!blocked .isDone ()) {
273292 blocked .set (null );
274293 }
@@ -285,7 +304,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
285304 executorService );
286305 return channelBlocked ;
287306 } else { // reserve memory succeeded, add the TsBlock directly
288- queue .add (tsBlock );
307+ queue .add (new Pair <>( tsBlock , reservedBytes ) );
289308 if (!blocked .isDone ()) {
290309 blocked .set (null );
291310 }
0 commit comments