Skip to content

Commit c5e7062

Browse files
Merge remote-tracking branch 'origin/master' into ignite-28356
2 parents e3335de + 2b10e24 commit c5e7062

7 files changed

Lines changed: 234 additions & 27 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
145145
/** */
146146
private Object[] correlations = new Object[16];
147147

148+
/** Entries holder per execution thread. */
149+
private static final ThreadLocal<Collection<QueryTxEntry>> txEntriesHolder = new ThreadLocal<>();
150+
148151
/**
149152
* @param qctx Parent base query context.
150153
* @param qryId Query ID.
@@ -182,7 +185,7 @@ public ExecutionContext(
182185
this.ioTracker = ioTracker;
183186
this.params = params;
184187
this.timeout = timeout;
185-
this.qryTxEntries = qryTxEntries;
188+
this.qryTxEntries = qryTxEntries == null ? txEntriesHolder.get() : qryTxEntries;
186189

187190
startTs = U.currentTimeMillis();
188191

@@ -421,12 +424,17 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
421424

422425
executor.execute(qryId, fragmentId(), () -> {
423426
try {
427+
txEntriesHolder.set(qryTxEntries);
428+
424429
if (!isCancelled())
425430
task.run();
426431
}
427432
catch (Throwable e) {
428433
onError.accept(e);
429434
}
435+
finally {
436+
txEntriesHolder.remove();
437+
}
430438
});
431439
}
432440

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query.calcite.integration;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.apache.ignite.Ignite;
23+
import org.apache.ignite.IgniteCache;
24+
import org.apache.ignite.IgniteCheckedException;
25+
import org.apache.ignite.Ignition;
26+
import org.apache.ignite.cache.QueryEntity;
27+
import org.apache.ignite.cache.query.SqlFieldsQuery;
28+
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
29+
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
30+
import org.apache.ignite.configuration.CacheConfiguration;
31+
import org.apache.ignite.configuration.IgniteConfiguration;
32+
import org.apache.ignite.internal.IgniteInternalFuture;
33+
import org.apache.ignite.internal.util.typedef.F;
34+
import org.apache.ignite.testframework.GridTestUtils;
35+
import org.apache.ignite.testframework.junits.WithSystemProperty;
36+
import org.apache.ignite.transactions.Transaction;
37+
import org.junit.Test;
38+
39+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
40+
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
41+
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
42+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
43+
import static org.hamcrest.CoreMatchers.equalTo;
44+
import static org.junit.Assert.assertThat;
45+
46+
/**
47+
* Integration test for user defined functions with tx aware.
48+
*/
49+
@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, value = "true")
50+
public class UserDefinedTxAwareFunctionsIntegrationTest extends AbstractBasicIntegrationTest {
51+
/** */
52+
private static final int THREAD_NUM = 10;
53+
54+
/** {@inheritDoc} */
55+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
56+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
57+
58+
cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration());
59+
cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
60+
cfg.setQueryThreadPoolSize(2 * THREAD_NUM + 1);
61+
62+
return cfg;
63+
}
64+
65+
/** Check tx aware UDF execution results. */
66+
@Test
67+
public void testTxAwareUserDefinedFunc() {
68+
assertTrue(nodeCount() > 1);
69+
int nodeCnt = nodeCount();
70+
71+
List<List<Object>> refResults = new ArrayList<>();
72+
73+
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
74+
75+
refResults.add(List.of(0, Integer.toString(0)));
76+
// Insert outside tx.
77+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(0, 0)).getAll();
78+
79+
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
80+
for (int i = 1; i < 2 * nodeCnt; ++i) {
81+
refResults.add(List.of(i, Integer.toString(i)));
82+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
83+
}
84+
85+
// Simple select without UDF.
86+
List<List<?>> selectResult = cache
87+
.query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER BY id"))
88+
.getAll();
89+
90+
assertThat(selectResult, equalTo(refResults));
91+
92+
// Select with UDF.
93+
List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT customTableFunc() AS result")).getAll();
94+
95+
assertThat(res.get(0).get(0), equalTo(refResults));
96+
97+
// Select with nested UDF.
98+
res = cache.query(new SqlFieldsQuery("SELECT customNestedTableFunc() AS result")).getAll();
99+
100+
assertThat(res.get(0).get(0), equalTo(refResults));
101+
102+
// UDF participate in DML.
103+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (100, nameAsStr(1))")).getAll();
104+
105+
res = cache.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = 100")).getAll();
106+
107+
assertEquals("1", res.get(0).get(0));
108+
109+
for (int i = 0; i < 2 * nodeCnt; ++i) {
110+
// A bit different case of UDF.
111+
List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT nameTableFunc(?) AS result").setArgs(i)).getAll();
112+
113+
List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
114+
115+
assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
116+
}
117+
118+
tx.commit();
119+
}
120+
}
121+
122+
/** */
123+
@Test
124+
public void testIsolationCorrectnessWithUdf() throws IgniteCheckedException {
125+
assertTrue(nodeCount() > 1);
126+
int nodeCnt = nodeCount();
127+
128+
IgniteCache<Integer, Object> cache = client.getOrCreateCache(cacheConfig());
129+
130+
/* The pool size should be greater than the maximum number of concurrent queries initiated by UDFs. */
131+
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
132+
for (int iter = 0; iter < 10; ++iter) {
133+
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
134+
List<List<Object>> refResults = new ArrayList<>();
135+
136+
for (int i = 0; i < 2 * nodeCnt; ++i) {
137+
refResults.add(List.of(i, Integer.toString(i)));
138+
cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
139+
}
140+
141+
List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT customNestedTableFunc() AS result")).getAll();
142+
143+
assertThat(res.get(0).get(0), equalTo(refResults));
144+
145+
tx.rollback();
146+
}
147+
}
148+
}, THREAD_NUM, "calcite-tx-with-udf");
149+
150+
fut.get(30_000);
151+
}
152+
153+
/** */
154+
private CacheConfiguration<Integer, Object> cacheConfig() {
155+
return this.<Integer, Object>cacheConfiguration()
156+
.setName(DEFAULT_CACHE_NAME)
157+
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class)
158+
.setTableName("Employer")
159+
.addQueryField("ID", Integer.class.getName(), null)
160+
.setKeyFieldName("ID")
161+
))
162+
.setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
163+
.setAtomicityMode(TRANSACTIONAL);
164+
}
165+
166+
/** */
167+
public static class InnerSqlFunctionsLibrary {
168+
/** */
169+
@QuerySqlFunction
170+
public List<List<?>> customTableFunc() {
171+
Ignite ignite = Ignition.localIgnite();
172+
173+
return ignite.cache(DEFAULT_CACHE_NAME)
174+
.query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER BY id"))
175+
.getAll();
176+
}
177+
178+
/** */
179+
@QuerySqlFunction
180+
public List<List<?>> customNestedTableFunc() {
181+
Ignite ignite = Ignition.localIgnite();
182+
183+
Object res = ignite.cache(DEFAULT_CACHE_NAME)
184+
.query(new SqlFieldsQuery("SELECT customTableFunc() AS result"))
185+
.getAll().get(0).get(0);
186+
187+
return (List<List<?>>)res;
188+
}
189+
190+
/** */
191+
@QuerySqlFunction
192+
public static List<List<?>> nameTableFunc(int id) {
193+
Ignite ignite = Ignition.localIgnite();
194+
195+
return ignite.cache(DEFAULT_CACHE_NAME)
196+
.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = ?").setArgs(id))
197+
.getAll();
198+
}
199+
200+
/** */
201+
@QuerySqlFunction
202+
public static String nameAsStr(int id) {
203+
Ignite ignite = Ignition.localIgnite();
204+
205+
List<List<?>> res = ignite.cache(DEFAULT_CACHE_NAME)
206+
.query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id = ?").setArgs(id))
207+
.getAll();
208+
209+
return (String)res.get(0).get(0);
210+
}
211+
}
212+
}

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ private Node<Object[]> implementFragment(
353353
NoOpIoTracker.INSTANCE,
354354
0,
355355
Commons.parametersMap(ctx.parameters()),
356-
null);
356+
null
357+
);
357358

358359
return new LogicalRelImplementor<>(ectx, c -> r -> 0, mailboxRegistry, exchangeSvc,
359360
new TestFailureProcessor(kernal)).go(fragment.root());

modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
8585
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
8686
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
87+
import org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedTxAwareFunctionsIntegrationTest;
8788
import org.apache.ignite.internal.processors.query.calcite.integration.ViewsIntegrationTest;
8889
import org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale001Test;
8990
import org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale010Test;
@@ -177,6 +178,7 @@
177178
CacheStoreTest.class,
178179
MultiDcQueryMappingTest.class,
179180
TxWithExceptionalInterceptorTest.class,
181+
UserDefinedTxAwareFunctionsIntegrationTest.class,
180182
CacheWithInterceptorIntegrationTest.class,
181183
TxWithExceptionalInterceptorTest.class,
182184
SelectByKeyFieldTest.class,

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRegionMetricsImpl.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Optional;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import org.apache.ignite.DataRegionMetrics;
2526
import org.apache.ignite.DataRegionMetricsProvider;
2627
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -30,7 +31,6 @@
3031
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetricsImpl;
3132
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
3233
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
33-
import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
3434
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
3535
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
3636
import org.apache.ignite.internal.processors.metric.impl.LongAdderWithDelegateMetric;
@@ -180,7 +180,7 @@ private static LongAdderMetricDelegate delegate(LongAdderMetric delegate) {
180180
private final PeriodicHistogramMetricImpl pageTsHistogram;
181181

182182
/** Metric indicating whether page eviction has started. */
183-
private final BooleanMetricImpl evictionsStarted;
183+
private final AtomicBoolean evictionsStarted = new AtomicBoolean();
184184

185185
/**
186186
* Same as {@link #DataRegionMetricsImpl(DataRegionConfiguration, GridKernalContext, DataRegionMetricsProvider)}
@@ -285,7 +285,7 @@ public DataRegionMetricsImpl(
285285
mreg.longMetric("MaxSize", "Maximum memory region size in bytes defined by its data region.")
286286
.value(dataRegionCfg.getMaxSize());
287287

288-
evictionsStarted = mreg.booleanMetric("EvictionsStarted",
288+
mreg.register("EvictionsStarted", evictionsStarted::get,
289289
"True if page eviction was triggered due to data region memory pressure.");
290290

291291
if (persistenceEnabled) {
@@ -885,12 +885,11 @@ public Collection<PagesTimestampHistogramView> pagesTimestampHistogramView() {
885885

886886
/** {@inheritDoc} */
887887
@Override public boolean isEvictionsStarted() {
888-
return evictionsStarted.value();
888+
return evictionsStarted.get();
889889
}
890890

891-
/** */
892-
public void onPageEvictionsStarted() {
893-
if (!evictionsStarted.value())
894-
evictionsStarted.value(true);
891+
/** @return {@code True} if eviction has started for the first time. */
892+
public boolean onPageEvictionsStarted() {
893+
return evictionsStarted.compareAndSet(false, true);
895894
}
896895
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
167167
/** Data storege metrics. */
168168
protected final DataStorageMetricsImpl dsMetrics;
169169

170-
/** */
171-
private final Object mux = new Object();
172-
173170
/**
174171
* @param ctx Kernal context.
175172
*/
@@ -1247,19 +1244,7 @@ public void ensureFreeSpace(DataRegion memPlc) throws IgniteCheckedException {
12471244
return;
12481245

12491246
while (memPlc.evictionTracker().evictionRequired()) {
1250-
boolean shouldLog = false;
1251-
1252-
if (!memPlc.metrics().isEvictionsStarted()) {
1253-
synchronized (mux) {
1254-
if (!memPlc.metrics().isEvictionsStarted()) {
1255-
memPlc.metrics().onPageEvictionsStarted();
1256-
1257-
shouldLog = true;
1258-
}
1259-
}
1260-
}
1261-
1262-
if (shouldLog) {
1247+
if (memPlc.metrics().onPageEvictionsStarted()) {
12631248
U.warn(log, "Page-based evictions started." +
12641249
" Consider increasing 'maxSize' on Data Region configuration: " + memPlc.config().getName());
12651250
}

modules/platforms/dotnet/Apache.Ignite.Core.Tests/Unmanaged/JniThreadDetachTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private string[] GetJavaThreadNames()
5050
{
5151
return Ignite.GetCompute()
5252
.ExecuteJavaTask<string[]>("org.apache.ignite.platform.PlatformThreadNamesTask", null)
53-
.Where(x => !x.StartsWith("pub-#") && !x.StartsWith("jvm-"))
53+
.Where(x => !x.StartsWith("pub-#") && !x.StartsWith("jvm-") && !x.StartsWith("mgmt-"))
5454
.OrderBy(x => x)
5555
.ToArray();
5656
}

0 commit comments

Comments
 (0)