Skip to content

Commit 7e15f04

Browse files
alex-plekhanovzstan
authored andcommitted
IGNITE-27738 SQL Calcite: Fix planner hang on multi-row values - Fixes #12697.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com> (cherry picked from commit f0964f0)
1 parent fbd59d2 commit 7e15f04

8 files changed

Lines changed: 253 additions & 67 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteColocatedSortAggregate.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.List;
2121
import java.util.Objects;
22-
2322
import org.apache.calcite.plan.RelOptCluster;
2423
import org.apache.calcite.plan.RelOptCost;
2524
import org.apache.calcite.plan.RelOptPlanner;
@@ -74,8 +73,13 @@ public IgniteColocatedSortAggregate(RelInput input) {
7473
/** {@inheritDoc} */
7574
@Override public Aggregate copy(RelTraitSet traitSet, RelNode input, ImmutableBitSet groupSet,
7675
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
76+
RelCollation collation = TraitUtils.collation(input.getTraitSet());
77+
78+
assert collation.satisfies(TraitUtils.collation(traitSet))
79+
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);
80+
7781
return new IgniteColocatedSortAggregate(
78-
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
82+
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
7983
}
8084

8185
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteMapSortAggregate.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,15 @@ public IgniteMapSortAggregate(RelInput input) {
8282
RelNode input,
8383
ImmutableBitSet groupSet,
8484
List<ImmutableBitSet> groupSets,
85-
List<AggregateCall> aggCalls) {
85+
List<AggregateCall> aggCalls
86+
) {
87+
RelCollation collation = TraitUtils.collation(input.getTraitSet());
88+
89+
assert collation.satisfies(TraitUtils.collation(traitSet))
90+
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);
91+
8692
return new IgniteMapSortAggregate(
87-
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, TraitUtils.collation(traitSet));
93+
getCluster(), traitSet, input, groupSet, groupSets, aggCalls, collation);
8894
}
8995

9096
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceSortAggregate.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.List;
2121
import java.util.Objects;
22-
2322
import org.apache.calcite.plan.RelOptCluster;
2423
import org.apache.calcite.plan.RelOptCost;
2524
import org.apache.calcite.plan.RelOptPlanner;
@@ -75,6 +74,11 @@ public IgniteReduceSortAggregate(RelInput input) {
7574

7675
/** {@inheritDoc} */
7776
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
77+
RelCollation collation = TraitUtils.collation(sole(inputs).getTraitSet());
78+
79+
assert collation.satisfies(TraitUtils.collation(traitSet))
80+
: "Unexpected collations: input=" + collation + ", traitSet=" + TraitUtils.collation(traitSet);
81+
7882
return new IgniteReduceSortAggregate(
7983
getCluster(),
8084
traitSet,
@@ -83,7 +87,7 @@ public IgniteReduceSortAggregate(RelInput input) {
8387
groupSets,
8488
aggCalls,
8589
rowType,
86-
TraitUtils.collation(traitSet)
90+
collation
8791
);
8892
}
8993

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.ignite.internal.processors.query.calcite.trait;
1919

2020
import java.math.BigDecimal;
21+
import java.util.ArrayList;
22+
import java.util.BitSet;
2123
import java.util.Collection;
2224
import java.util.Collections;
2325
import java.util.HashMap;
@@ -29,6 +31,7 @@
2931
import com.google.common.collect.ImmutableList;
3032
import com.google.common.collect.ImmutableSet;
3133
import org.apache.calcite.linq4j.Ord;
34+
import org.apache.calcite.plan.AbstractRelOptPlanner;
3235
import org.apache.calcite.plan.RelOptCluster;
3336
import org.apache.calcite.plan.RelOptPlanner;
3437
import org.apache.calcite.plan.RelOptRule;
@@ -61,6 +64,7 @@
6164
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
6265
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
6366
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
67+
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
6468
import org.apache.ignite.internal.util.typedef.F;
6569
import org.jetbrains.annotations.Nullable;
6670

@@ -167,6 +171,7 @@ else if (converter == RewindabilityTraitDef.INSTANCE)
167171
RelOptRule.convert(
168172
rel,
169173
rel.getTraitSet()
174+
.replace(RewindabilityTrait.ONE_WAY)
170175
.replace(CorrelationTrait.UNCORRELATED)
171176
),
172177
toTrait);
@@ -419,15 +424,58 @@ public static Pair<RelTraitSet, List<RelTraitSet>> passThrough(TraitsAwareIgnite
419424

420425
assert traits.size() <= 1;
421426

427+
if (!traits.isEmpty() && traits.get(0).left.satisfies(requiredTraits)) {
428+
// Return most relaxed parent traits.
429+
return Pair.of(requiredTraits, traits.get(0).right);
430+
}
431+
422432
return F.first(traits);
423433
}
424434

435+
/** */
436+
public static List<RelTraitSet> removeDuplicates(List<RelTraitSet> traits) {
437+
BitSet duplicates = null;
438+
439+
for (int i = 0; i < traits.size() - 1; i++) {
440+
if (duplicates != null && duplicates.get(i))
441+
continue;
442+
443+
for (int j = i + 1; j < traits.size(); j++) {
444+
if (duplicates != null && duplicates.get(j))
445+
continue;
446+
447+
// Return most strict child traits.
448+
if (traits.get(i).satisfies(traits.get(j)))
449+
(duplicates == null ? duplicates = new BitSet() : duplicates).set(j);
450+
else if (traits.get(j).satisfies(traits.get(i))) {
451+
(duplicates == null ? duplicates = new BitSet() : duplicates).set(i);
452+
break;
453+
}
454+
}
455+
}
456+
457+
if (duplicates == null)
458+
return traits;
459+
460+
List<RelTraitSet> newTraits = new ArrayList<>(traits.size() - duplicates.cardinality());
461+
462+
for (int i = 0; i < traits.size(); i++) {
463+
if (!duplicates.get(i))
464+
newTraits.add(traits.get(i));
465+
}
466+
467+
return newTraits;
468+
}
469+
425470
/** */
426471
public static List<RelNode> derive(TraitsAwareIgniteRel rel, List<List<RelTraitSet>> inTraits) {
427472
assert !F.isEmpty(inTraits);
428473

429474
RelTraitSet outTraits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE);
430-
Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(outTraits, inTraits);
475+
476+
inTraits = Commons.transform(inTraits, TraitUtils::removeDuplicates);
477+
478+
Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations = combinations(rel, outTraits, inTraits);
431479

432480
if (combinations.isEmpty())
433481
return ImmutableList.of();
@@ -448,21 +496,33 @@ private static <T> List<T> singletonListFromNullable(@Nullable T elem) {
448496
}
449497

450498
/** */
451-
private static Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations(RelTraitSet outTraits, List<List<RelTraitSet>> inTraits) {
499+
private static Set<Pair<RelTraitSet, List<RelTraitSet>>> combinations(
500+
TraitsAwareIgniteRel rel,
501+
RelTraitSet outTraits,
502+
List<List<RelTraitSet>> inTraits
503+
) {
452504
Set<Pair<RelTraitSet, List<RelTraitSet>>> out = new HashSet<>();
453-
fillRecursive(outTraits, inTraits, out, new RelTraitSet[inTraits.size()], 0);
505+
fillRecursive(rel, outTraits, inTraits, out, new RelTraitSet[inTraits.size()], 0);
454506
return out;
455507
}
456508

457509
/** */
458510
private static boolean fillRecursive(
511+
TraitsAwareIgniteRel rel,
459512
RelTraitSet outTraits,
460513
List<List<RelTraitSet>> inTraits,
461514
Set<Pair<RelTraitSet, List<RelTraitSet>>> result,
462515
RelTraitSet[] combination,
463516
int idx
464517
) throws ControlFlowException {
465518
boolean processed = false, last = idx == inTraits.size() - 1;
519+
520+
if (last) {
521+
assert rel.getCluster().getPlanner() instanceof AbstractRelOptPlanner;
522+
523+
((AbstractRelOptPlanner)rel.getCluster().getPlanner()).checkCancel();
524+
}
525+
466526
for (RelTraitSet t : inTraits.get(idx)) {
467527
assert t.getConvention() == IgniteConvention.INSTANCE;
468528

@@ -471,7 +531,7 @@ private static boolean fillRecursive(
471531

472532
if (last)
473533
result.add(Pair.of(outTraits, ImmutableList.copyOf(combination)));
474-
else if (!fillRecursive(outTraits, inTraits, result, combination, idx + 1))
534+
else if (!fillRecursive(rel, outTraits, inTraits, result, combination, idx + 1))
475535
return false;
476536
}
477537
return processed;

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,21 @@ public void correctCollationsOnMapReduceSortAgg() throws InterruptedException {
140140
assertEquals(ROWS, cursors.size());
141141
}
142142

143+
/** */
144+
@Test
145+
public void testNullsReordering() {
146+
sql("CREATE TABLE t(a INTEGER, b INTEGER) WITH " + atomicity());
147+
sql("INSERT INTO t VALUES (1, 1), (2, 2), (1, 3), (3, 4), (NULL, 1), (1, NULL)");
148+
149+
assertQuery("SELECT a, SUM(b), COUNT(b), COUNT(*) FROM t GROUP BY a ORDER BY a NULLS LAST")
150+
.ordered()
151+
.returns(1, 4L, 2L, 3L)
152+
.returns(2, 2L, 1L, 1L)
153+
.returns(3, 4L, 1L, 1L)
154+
.returns(null, 1L, 1L, 1L)
155+
.check();
156+
}
157+
143158
/**
144159
* @param c Cache.
145160
* @param rows Rows count.

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.time.Duration;
2424
import java.time.Period;
2525
import java.util.Arrays;
26+
import java.util.Collections;
2627
import java.util.List;
2728
import java.util.Objects;
2829
import java.util.UUID;
@@ -657,6 +658,20 @@ public void testInsertIncorrectDate() {
657658
assertThrows("INSERT INTO timestamp_t VALUES ('1900-1-1 00-00-00')", errType, errDate);
658659
}
659660

661+
/** */
662+
@Test
663+
public void testInsertMultiRowValues() {
664+
sql("CREATE TABLE test (id int, val int) WITH " + atomicity());
665+
666+
int rowsCnt = 50;
667+
668+
String sql = "INSERT INTO test VALUES " + String.join(", ", Collections.nCopies(rowsCnt, "(?, ?)"));
669+
670+
sql(sql, new Object[rowsCnt * 2]);
671+
672+
assertQuery("SELECT * FROM test").resultSize(rowsCnt).check();
673+
}
674+
660675
/** */
661676
private void checkDefaultValue(String sqlType, String sqlVal, Object expectedVal) {
662677
try {

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.ignite.internal.util.typedef.internal.CU;
3434
import org.junit.Test;
3535

36+
import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
3637
import static org.hamcrest.CoreMatchers.nullValue;
3738
import static org.junit.Assert.assertThat;
3839

@@ -209,4 +210,31 @@ public void joinBroadcastAggregateRehash() throws Exception {
209210
)
210211
);
211212
}
213+
214+
/**
215+
* Re-hashing right hand for merge join.
216+
*/
217+
@Test
218+
public void joinMergeJoinAffinityRehash() throws Exception {
219+
IgniteSchema schema = createSchema(
220+
createTable("ORDERS", IgniteDistributions.affinity(0, "orders", "hash"),
221+
"ID", INTEGER, "REGION", INTEGER)
222+
.addIndex("ORDER_ID_IDX", 0),
223+
createTable("ORDER_ITEMS", IgniteDistributions.affinity(0, "order_items", "hash"),
224+
"ID", INTEGER, "ORDER_ID", INTEGER, "AMOUNT", INTEGER)
225+
.addIndex("ORDER_ITEMS_ORDER_ID_IDX", 1)
226+
);
227+
228+
String sql = "SELECT sum(amount)" +
229+
" FROM order_items i JOIN orders o ON o.id=i.order_id" +
230+
" WHERE o.region = ?";
231+
232+
assertPlan(sql, schema,
233+
nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class))
234+
.and(hasChildThat(isIndexScan("ORDERS", "ORDER_ID_IDX")))
235+
.and(hasChildThat(isInstanceOf(IgniteExchange.class)
236+
.and(hasDistribution(IgniteDistributions.affinity(0, "orders", "hash")))
237+
.and(hasChildThat(isIndexScan("ORDER_ITEMS", "ORDER_ITEMS_ORDER_ID_IDX")))))
238+
);
239+
}
212240
}

0 commit comments

Comments
 (0)