Skip to content

Commit e8f16e0

Browse files
authored
IGNITE-28277 CacheInterceptor need to take into account cache keepBinary mode (#12911)
I make a fix only for calcite related part, honestly - i afraid to make an equal changes on h2 related part and prefer to store it - as is. If approach is ok - i fill follow up issue for related documentation change.
1 parent bc00caf commit e8f16e0

24 files changed

Lines changed: 462 additions & 53 deletions

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmarks.jmh.sql;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.ignite.cache.CacheInterceptorAdapter;
23+
import org.apache.ignite.configuration.CacheConfiguration;
24+
import org.openjdk.jmh.annotations.Benchmark;
25+
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Fork;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Param;
31+
import org.openjdk.jmh.annotations.Scope;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.annotations.Warmup;
34+
import org.openjdk.jmh.runner.Runner;
35+
import org.openjdk.jmh.runner.options.Options;
36+
import org.openjdk.jmh.runner.options.OptionsBuilder;
37+
38+
/**
39+
* Benchmark cache with interceptor queries.
40+
*/
41+
@Fork(1)
42+
@BenchmarkMode(Mode.Throughput)
43+
@OutputTimeUnit(TimeUnit.SECONDS)
44+
@Warmup(iterations = 5, time = 5)
45+
@Measurement(iterations = 10, time = 5)
46+
@State(Scope.Benchmark)
47+
public class JmhCacheWithInterceptorBenchmark extends JmhSqlAbstractBenchmark {
48+
/** Query engine. */
49+
@Param({"CALCITE"})
50+
protected String engine;
51+
52+
/** Keep binary mode. */
53+
@Param({"true", "false"})
54+
protected boolean keepBinary;
55+
56+
/** {@inheritDoc} */
57+
@Override protected CacheConfiguration<Integer, Item> cacheConfiguration() {
58+
return super.cacheConfiguration().setInterceptor(new CacheInterceptorAdapter<>());
59+
}
60+
61+
/** Test update operation. */
62+
@Benchmark
63+
public void update() {
64+
int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
65+
66+
executeSql("UPDATE CACHE.Item SET fld = fld + 1 WHERE fldIdx=?", key);
67+
}
68+
69+
/**
70+
* Run benchmarks.
71+
*
72+
* @param args Args.
73+
* @throws Exception Exception.
74+
*/
75+
public static void main(String[] args) throws Exception {
76+
final Options options = new OptionsBuilder()
77+
.include(JmhCacheWithInterceptorBenchmark.class.getSimpleName())
78+
.build();
79+
80+
new Runner(options).run();
81+
}
82+
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,9 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
672672
try {
673673
SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);
674674

675+
QueryProperties props = qry.context().unwrap(QueryProperties.class);
676+
boolean keepBinaryMode = props == null || props.keepBinary();
677+
675678
QueryStartRequest req = new QueryStartRequest(
676679
qry.id(),
677680
qry.localQueryId(),
@@ -684,7 +687,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
684687
parametersMarshalled,
685688
timeout,
686689
ectx.getQryTxEntries(),
687-
sesCtx == null ? null : sesCtx.attributes()
690+
sesCtx == null ? null : sesCtx.attributes(),
691+
keepBinaryMode
688692
);
689693

690694
messageService().send(nodeId, req);
@@ -881,8 +885,13 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
881885
)
882886
);
883887

888+
boolean keepBinaryMode = msg.keepBinaryMode();
889+
QueryProperties qryProps = new QueryProperties(null, keepBinaryMode, false);
890+
884891
final BaseQueryContext qctx = createQueryContext(
885-
msg.applicationAttributes() == null ? Contexts.empty() : Contexts.of(new SessionContextImpl(msg.applicationAttributes())),
892+
msg.applicationAttributes() == null ?
893+
Contexts.of(qryProps) :
894+
Contexts.of(new SessionContextImpl(msg.applicationAttributes()), qryProps),
886895
msg.schema());
887896

888897
FragmentPlan fragmentPlan = fragmentPlanCache.computeIfAbsent(msg.root(), k -> prepareFragment(qctx, k));

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private AbstractMatchingHashJoin(
394394
private void downstreamPush(Row left, Row right) throws Exception {
395395
requested--;
396396

397-
downstream().push(outRowFactory.apply(left, right));;
397+
downstream().push(outRowFactory.apply(left, right));
398398
}
399399

400400
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131
import org.apache.ignite.IgniteCheckedException;
3232
import org.apache.ignite.internal.cache.context.SessionContextImpl;
3333
import org.apache.ignite.internal.processors.cache.GridCacheContext;
34-
import org.apache.ignite.internal.processors.cache.GridCacheProxyImpl;
34+
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
3535
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
3636
import org.apache.ignite.internal.processors.query.IgniteSQLException;
37+
import org.apache.ignite.internal.processors.query.QueryProperties;
3738
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
3839
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
3940
import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
@@ -200,9 +201,15 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
200201
this.tuples = new ArrayList<>(MODIFY_BATCH_SIZE);
201202

202203
GridCacheContext<Object, Object> cctx = desc.cacheContext();
203-
GridCacheProxyImpl<Object, Object> cache = cctx.cache().keepBinary();
204+
IgniteInternalCache<Object, Object> cache = cctx.cache();
204205
GridNearTxLocal tx = Commons.queryTransaction(context(), cctx.shared());
205206

207+
QueryProperties props = context().unwrap(QueryProperties.class);
208+
boolean keepBinaryMode = props == null || props.keepBinary();
209+
210+
if (keepBinaryMode)
211+
cache = cache.keepBinary();
212+
206213
if (tx == null)
207214
invokeOutsideTransaction(tuples, cache);
208215
else
@@ -217,7 +224,7 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
217224
*/
218225
private void invokeOutsideTransaction(
219226
List<ModifyTuple> tuples,
220-
GridCacheProxyImpl<Object, Object> cache
227+
IgniteInternalCache<Object, Object> cache
221228
) throws IgniteCheckedException {
222229
SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class);
223230
Map<String, String> sesAttrs = sesCtx == null ? null : sesCtx.attributes();
@@ -251,7 +258,7 @@ private void invokeOutsideTransaction(
251258
*/
252259
private void invokeInsideTransaction(
253260
List<ModifyTuple> tuples,
254-
GridCacheProxyImpl<Object, Object> cache,
261+
IgniteInternalCache<Object, Object> cache,
255262
GridNearTxLocal userTx
256263
) throws IgniteCheckedException {
257264
userTx.resume();

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ public class QueryStartRequest implements CalciteMarshalableMessage, ExecutionCo
7979
@Order(10)
8080
@Nullable Map<String, String> appAttrs;
8181

82+
/** */
83+
@Order(11)
84+
boolean keepBinaryMode;
85+
8286
/** */
8387
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
8488
public QueryStartRequest(
@@ -93,7 +97,8 @@ public QueryStartRequest(
9397
@Nullable byte[] paramsBytes,
9498
long timeout,
9599
@Nullable Collection<QueryTxEntry> qryTxEntries,
96-
@Nullable Map<String, String> appAttrs
100+
@Nullable Map<String, String> appAttrs,
101+
boolean keepBinaryMode
97102
) {
98103
this.qryId = qryId;
99104
this.originatingQryId = originatingQryId;
@@ -107,6 +112,7 @@ public QueryStartRequest(
107112
this.timeout = timeout;
108113
this.qryTxEntries = qryTxEntries;
109114
this.appAttrs = appAttrs;
115+
this.keepBinaryMode = keepBinaryMode;
110116
}
111117

112118
/** */
@@ -199,6 +205,11 @@ public long timeout() {
199205
return appAttrs;
200206
}
201207

208+
/** */
209+
public boolean keepBinaryMode() {
210+
return keepBinaryMode;
211+
}
212+
202213
/** {@inheritDoc} */
203214
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
204215
if (paramsBytes == null && params != null)

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class CancelTest extends GridCommonAbstractTest {
7272
.setKeyFieldName("id")
7373
.setValueFieldName("val")
7474
.addQueryField("id", Integer.class.getName(), null)
75-
.addQueryField("val", String.class.getName(), null);;
75+
.addQueryField("val", String.class.getName(), null);
7676

7777
return super.getConfiguration(igniteInstanceName)
7878
.setCacheConfiguration(

0 commit comments

Comments
 (0)