Skip to content

Commit fbd59d2

Browse files
Vladsz83zstan
authored andcommitted
IGNITE-27660 SQL Calcite: Fix usage of checkState() in the join nodes - Fixes #12680.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com> (cherry picked from commit bf8b18d)
1 parent 28e9b76 commit fbd59d2

5 files changed

Lines changed: 98 additions & 63 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected AbstractRightMaterializedJoinNode(ExecutionContext<Row> ctx, RelDataTy
8181
waitingLeft = 0;
8282
waitingRight = 0;
8383
left = null;
84+
processed = 0;
8485

8586
leftInBuf.clear();
8687
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.List;
2424
import java.util.Set;
2525
import java.util.function.BiPredicate;
26-
2726
import org.apache.calcite.rel.core.CorrelationId;
2827
import org.apache.calcite.rel.core.JoinRelType;
2928
import org.apache.calcite.rel.type.RelDataType;
@@ -77,6 +76,9 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
7776
/** */
7877
private int rightIdx;
7978

79+
/** */
80+
private int processed;
81+
8082
/** */
8183
private Row rightEmptyRow;
8284

@@ -129,8 +131,6 @@ public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType rowTy
129131
assert !F.isEmpty(sources()) && sources().size() == 2;
130132
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;
131133

132-
checkState();
133-
134134
requested = rowsCnt;
135135

136136
onRequest();
@@ -196,8 +196,6 @@ private void pushLeft(Row row) throws Exception {
196196
assert downstream() != null;
197197
assert waitingLeft > 0;
198198

199-
checkState();
200-
201199
waitingLeft--;
202200

203201
if (leftInBuf == null)
@@ -213,8 +211,6 @@ private void pushRight(Row row) throws Exception {
213211
assert downstream() != null;
214212
assert waitingRight > 0;
215213

216-
checkState();
217-
218214
waitingRight--;
219215

220216
if (rightInBuf == null)
@@ -269,9 +265,8 @@ private void onRequest() throws Exception {
269265
assert F.isEmpty(rightInBuf);
270266

271267
context().execute(() -> {
272-
checkState();
273-
274268
state = State.FILLING_LEFT;
269+
275270
leftSource().request(waitingLeft = leftInBufferSize);
276271
}, this::onError);
277272

@@ -282,11 +277,7 @@ private void onRequest() throws Exception {
282277
assert waitingRight == -1 || waitingRight == 0 && rightInBuf.size() == rightInBufferSize;
283278
assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize;
284279

285-
context().execute(() -> {
286-
checkState();
287-
288-
join();
289-
}, this::onError);
280+
context().execute(this::join0, this::onError);
290281

291282
break;
292283

@@ -308,6 +299,8 @@ private void onPushLeft() throws Exception {
308299
if (leftInBuf.size() == leftInBufferSize) {
309300
assert waitingLeft == 0;
310301

302+
checkState();
303+
311304
prepareCorrelations();
312305

313306
if (waitingRight == -1)
@@ -384,7 +377,11 @@ private void join() throws Exception {
384377
leftIdx = 0;
385378

386379
while (requested > 0 && leftIdx < leftInBuf.size()) {
387-
checkState();
380+
if (processed++ > IN_BUFFER_SIZE) {
381+
context().execute(this::join0, this::onError);
382+
383+
return;
384+
}
388385

389386
Row left = leftInBuf.get(leftIdx);
390387
Row right = rightInBuf.get(rightIdx);
@@ -434,6 +431,7 @@ private void join() throws Exception {
434431

435432
try {
436433
while (requested > 0 && notMatchedIdx < leftInBuf.size()) {
434+
processed++;
437435
requested--;
438436

439437
downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow));
@@ -500,4 +498,13 @@ private void prepareCorrelations() {
500498
context().setCorrelated(row, correlationIds.get(i).getId());
501499
}
502500
}
501+
502+
/** */
503+
private void join0() throws Exception {
504+
checkState();
505+
506+
processed = 0;
507+
508+
join();
509+
}
503510
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,11 @@ protected AbstractStoringHashJoin(
198198
hashStore.computeIfAbsent(key, k -> createRowList()).add(row);
199199
}
200200

201-
if (waitingRight == 0)
201+
if (waitingRight == 0) {
202+
checkState();
203+
202204
rightSource().request(waitingRight = IN_BUFFER_SIZE);
205+
}
203206
}
204207

205208
/** */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
7777
/** */
7878
protected boolean inLoop;
7979

80+
/** */
81+
private int processed;
82+
8083
/**
8184
* Flag indicating that at least one of the inputs has exchange underneath. In this case we can't prematurely end
8285
* downstream if one of the inputs is drained, we need to wait for both inputs, since async message from remote
@@ -107,24 +110,16 @@ private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator
107110
assert !F.isEmpty(sources()) && sources().size() == 2;
108111
assert rowsCnt > 0 && requested == 0;
109112

110-
checkState();
111-
112113
requested = rowsCnt;
113114

114115
if (!inLoop)
115-
context().execute(this::doJoin, this::onError);
116-
}
117-
118-
/** */
119-
private void doJoin() throws Exception {
120-
checkState();
121-
122-
join();
116+
context().execute(this::join0, this::onError);
123117
}
124118

125119
/** {@inheritDoc} */
126120
@Override protected void rewindInternal() {
127121
requested = 0;
122+
processed = 0;
128123
waitingLeft = 0;
129124
waitingRight = 0;
130125

@@ -184,8 +179,6 @@ private void pushLeft(Row row) throws Exception {
184179
assert downstream() != null;
185180
assert waitingLeft > 0;
186181

187-
checkState();
188-
189182
waitingLeft--;
190183

191184
if (!finishing)
@@ -199,8 +192,6 @@ private void pushRight(Row row) throws Exception {
199192
assert downstream() != null;
200193
assert waitingRight > 0;
201194

202-
checkState();
203-
204195
waitingRight--;
205196

206197
if (!finishing)
@@ -214,8 +205,6 @@ private void endLeft() throws Exception {
214205
assert downstream() != null;
215206
assert waitingLeft > 0;
216207

217-
checkState();
218-
219208
waitingLeft = NOT_WAITING;
220209

221210
join();
@@ -226,8 +215,6 @@ private void endRight() throws Exception {
226215
assert downstream() != null;
227216
assert waitingRight > 0;
228217

229-
checkState();
230-
231218
waitingRight = NOT_WAITING;
232219

233220
join();
@@ -339,7 +326,8 @@ public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
339326
try {
340327
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
341328
|| rightMaterialization != null)) {
342-
checkState();
329+
if (rescheduleJoin())
330+
return;
343331

344332
if (left == null)
345333
left = leftInBuf.remove();
@@ -469,7 +457,8 @@ public LeftJoin(
469457
try {
470458
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
471459
|| rightMaterialization != null || waitingRight == NOT_WAITING)) {
472-
checkState();
460+
if (rescheduleJoin())
461+
return;
473462

474463
if (left == null) {
475464
left = leftInBuf.remove();
@@ -622,7 +611,8 @@ public RightJoin(
622611
try {
623612
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
624613
&& (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
625-
checkState();
614+
if (rescheduleJoin())
615+
return;
626616

627617
if (left == null && !leftInBuf.isEmpty())
628618
left = leftInBuf.remove();
@@ -796,7 +786,8 @@ public FullOuterJoin(
796786
try {
797787
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
798788
&& !(right == null && rightInBuf.isEmpty() && rightMaterialization == null && waitingRight != NOT_WAITING)) {
799-
checkState();
789+
if (rescheduleJoin())
790+
return;
800791

801792
if (left == null && !leftInBuf.isEmpty()) {
802793
left = leftInBuf.remove();
@@ -975,7 +966,8 @@ public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
975966
inLoop = true;
976967
try {
977968
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) {
978-
checkState();
969+
if (rescheduleJoin())
970+
return;
979971

980972
if (left == null)
981973
left = leftInBuf.remove();
@@ -1031,7 +1023,8 @@ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
10311023
try {
10321024
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) &&
10331025
!(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) {
1034-
checkState();
1026+
if (rescheduleJoin())
1027+
return;
10351028

10361029
if (left == null)
10371030
left = leftInBuf.remove();
@@ -1070,4 +1063,24 @@ else if (cmp > 0) {
10701063
tryToRequestInputs();
10711064
}
10721065
}
1066+
1067+
/** */
1068+
private void join0() throws Exception {
1069+
checkState();
1070+
1071+
processed = 0;
1072+
1073+
join();
1074+
}
1075+
1076+
/** */
1077+
protected boolean rescheduleJoin() {
1078+
if (processed++ > IN_BUFFER_SIZE) {
1079+
context().execute(this::join0, this::onError);
1080+
1081+
return true;
1082+
}
1083+
1084+
return false;
1085+
}
10731086
}

0 commit comments

Comments
 (0)