Skip to content

Commit 15f2122

Browse files
authored
IGNITE-28352 Calcite. User defined sql function miss entries are written under the same tx lock (#12936)
1 parent edfc743 commit 15f2122

4 files changed

Lines changed: 225 additions & 2 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
145145
/** */
146146
private Object[] correlations = new Object[16];
147147

148+
/** Entries holder per execution thread. */
149+
private static final ThreadLocal<Collection<QueryTxEntry>> txEntriesHolder = new ThreadLocal<>();
150+
148151
/**
149152
* @param qctx Parent base query context.
150153
* @param qryId Query ID.
@@ -182,7 +185,7 @@ public ExecutionContext(
182185
this.ioTracker = ioTracker;
183186
this.params = params;
184187
this.timeout = timeout;
185-
this.qryTxEntries = qryTxEntries;
188+
this.qryTxEntries = qryTxEntries == null ? txEntriesHolder.get() : qryTxEntries;
186189

187190
startTs = U.currentTimeMillis();
188191

@@ -421,12 +424,17 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
421424

422425
executor.execute(qryId, fragmentId(), () -> {
423426
try {
427+
txEntriesHolder.set(qryTxEntries);
428+
424429
if (!isCancelled())
425430
task.run();
426431
}
427432
catch (Throwable e) {
428433
onError.accept(e);
429434
}
435+
finally {
436+
txEntriesHolder.remove();
437+
}
430438
});
431439
}
432440

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query.calcite.integration;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.apache.ignite.Ignite;
23+
import org.apache.ignite.IgniteCache;
24+
import org.apache.ignite.IgniteCheckedException;
25+
import org.apache.ignite.Ignition;
26+
import org.apache.ignite.cache.QueryEntity;
27+
import org.apache.ignite.cache.query.SqlFieldsQuery;
28+
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
29+
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
30+
import org.apache.ignite.configuration.CacheConfiguration;
31+
import org.apache.ignite.configuration.IgniteConfiguration;
32+
import org.apache.ignite.internal.IgniteInternalFuture;
33+
import org.apache.ignite.internal.util.typedef.F;
34+
import org.apache.ignite.testframework.GridTestUtils;
35+
import org.apache.ignite.testframework.junits.WithSystemProperty;
36+
import org.apache.ignite.transactions.Transaction;
37+
import org.junit.Test;
38+
39+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
40+
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
41+
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
42+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
43+
import static org.hamcrest.CoreMatchers.equalTo;
44+
import static org.junit.Assert.assertThat;
45+
46+
/**
47+
* Integration test for user defined functions with tx aware.
48+
*/
49+
@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, value = "true")
50+
public class UserDefinedTxAwareFunctionsIntegrationTest extends AbstractBasicIntegrationTest {
51+
/** */
52+
private static final int THREAD_NUM = 10;
53+
54+
/** {@inheritDoc} */
55+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
56+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
57+
58+
cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration());
59+
cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
60+
cfg.setQueryThreadPoolSize(2 * THREAD_NUM + 1);
61+
62+
return cfg;
63+
}
64+
65+
/** Check tx aware UDF execution results. */
66+
@Test
67+
public void testTxAwareUserDefinedFunc() {
68+
assertTrue(nodeCount() > 1);
69+
int nodeCnt = nodeCount();
70+
71+
List<List<Object>> refResults = new ArrayList<>();
72+
73+
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
74+
75+
refResults.add(List.of(0, Integer.toString(0)));
76+
// Insert outside tx.
77+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(0, 0)).getAll();
78+
79+
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
80+
for (int i = 1; i < 2 * nodeCnt; ++i) {
81+
refResults.add(List.of(i, Integer.toString(i)));
82+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
83+
}
84+
85+
// Simple select without UDF.
86+
List<List<?>> selectResult = cache
87+
.query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER BY id"))
88+
.getAll();
89+
90+
assertThat(selectResult, equalTo(refResults));
91+
92+
// Select with UDF.
93+
List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT customTableFunc() AS result")).getAll();
94+
95+
assertThat(res.get(0).get(0), equalTo(refResults));
96+
97+
// Select with nested UDF.
98+
res = cache.query(new SqlFieldsQuery("SELECT customNestedTableFunc() AS result")).getAll();
99+
100+
assertThat(res.get(0).get(0), equalTo(refResults));
101+
102+
// UDF participate in DML.
103+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (100, nameAsStr(1))")).getAll();
104+
105+
res = cache.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = 100")).getAll();
106+
107+
assertEquals("1", res.get(0).get(0));
108+
109+
for (int i = 0; i < 2 * nodeCnt; ++i) {
110+
// A bit different case of UDF.
111+
List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT nameTableFunc(?) AS result").setArgs(i)).getAll();
112+
113+
List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
114+
115+
assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
116+
}
117+
118+
tx.commit();
119+
}
120+
}
121+
122+
/** */
123+
@Test
124+
public void testIsolationCorrectnessWithUdf() throws IgniteCheckedException {
125+
assertTrue(nodeCount() > 1);
126+
int nodeCnt = nodeCount();
127+
128+
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
129+
130+
/* The pool size should be greater than the maximum number of concurrent queries initiated by UDFs. */
131+
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
132+
for (int iter = 0; iter < 10; ++iter) {
133+
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
134+
List<List<Object>> refResults = new ArrayList<>();
135+
136+
for (int i = 0; i < 2 * nodeCnt; ++i) {
137+
refResults.add(List.of(i, Integer.toString(i)));
138+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
139+
}
140+
141+
List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT customNestedTableFunc() AS result")).getAll();
142+
143+
assertThat(res.get(0).get(0), equalTo(refResults));
144+
145+
tx.rollback();
146+
}
147+
}
148+
}, THREAD_NUM, "calcite-tx-with-udf");
149+
150+
fut.get(30_000);
151+
}
152+
153+
/** */
154+
private CacheConfiguration<Integer, Object> cacheConfig() {
155+
return this.<Integer, Object>cacheConfiguration()
156+
.setName(DEFAULT_CACHE_NAME)
157+
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class)
158+
.setTableName("Employer")
159+
.addQueryField("ID", Integer.class.getName(), null)
160+
.setKeyFieldName("ID")
161+
))
162+
.setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
163+
.setAtomicityMode(TRANSACTIONAL);
164+
}
165+
166+
/** */
167+
public static class InnerSqlFunctionsLibrary {
168+
/** */
169+
@QuerySqlFunction
170+
public List<List<?>> customTableFunc() {
171+
Ignite ignite = Ignition.localIgnite();
172+
173+
return ignite.cache(DEFAULT_CACHE_NAME)
174+
.query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER BY id"))
175+
.getAll();
176+
}
177+
178+
/** */
179+
@QuerySqlFunction
180+
public List<List<?>> customNestedTableFunc() {
181+
Ignite ignite = Ignition.localIgnite();
182+
183+
Object res = ignite.cache(DEFAULT_CACHE_NAME)
184+
.query(new SqlFieldsQuery("SELECT customTableFunc() AS result"))
185+
.getAll().get(0).get(0);
186+
187+
return (List<List<?>>)res;
188+
}
189+
190+
/** */
191+
@QuerySqlFunction
192+
public static List<List<?>> nameTableFunc(int id) {
193+
Ignite ignite = Ignition.localIgnite();
194+
195+
return ignite.cache(DEFAULT_CACHE_NAME)
196+
.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = ?").setArgs(id))
197+
.getAll();
198+
}
199+
200+
/** */
201+
@QuerySqlFunction
202+
public static String nameAsStr(int id) {
203+
Ignite ignite = Ignition.localIgnite();
204+
205+
List<List<?>> res = ignite.cache(DEFAULT_CACHE_NAME)
206+
.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = ?").setArgs(id))
207+
.getAll();
208+
209+
return (String)res.get(0).get(0);
210+
}
211+
}
212+
}

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ private Node<Object[]> implementFragment(
353353
NoOpIoTracker.INSTANCE,
354354
0,
355355
Commons.parametersMap(ctx.parameters()),
356-
null);
356+
null
357+
);
357358

358359
return new LogicalRelImplementor<>(ectx, c -> r -> 0, mailboxRegistry, exchangeSvc,
359360
new TestFailureProcessor(kernal)).go(fragment.root());

modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
8585
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
8686
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
87+
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedTxAwareFunctionsIntegrationTest;
8788
import org.apache.ignite.internal.processors.query.calcite.integration.ViewsIntegrationTest;
8889
import org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale001Test;
8990
import org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale010Test;
@@ -177,6 +178,7 @@
177178
CacheStoreTest.class,
178179
MultiDcQueryMappingTest.class,
179180
TxWithExceptionalInterceptorTest.class,
181+
UserDefinedTxAwareFunctionsIntegrationTest.class,
180182
CacheWithInterceptorIntegrationTest.class,
181183
TxWithExceptionalInterceptorTest.class,
182184
SelectByKeyFieldTest.class,

0 commit comments

Comments
 (0)