Skip to content

Commit b3ab436

Browse files
committed
Merge branch 'master' into IGNITE-28221-Combining-of-the-message-factories
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java # modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java # modules/core/src/test/java/org/apache/ignite/internal/util/distributed/MessagesPluginProvider.java
2 parents 85968d8 + 5208e0f commit b3ab436

108 files changed

Lines changed: 2919 additions & 2116 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.mvn/wrapper/MavenWrapperDownloader.java

Lines changed: 0 additions & 117 deletions
This file was deleted.

.mvn/wrapper/maven-wrapper.jar

-49.5 KB
Binary file not shown.

.mvn/wrapper/maven-wrapper.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
# limitations under the License.
1616
#
1717

18+
wrapperVersion=3.3.4
19+
distributionType=only-script
1820
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
19-
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmarks.jmh.sql;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.ignite.cache.CacheInterceptorAdapter;
23+
import org.apache.ignite.configuration.CacheConfiguration;
24+
import org.openjdk.jmh.annotations.Benchmark;
25+
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Fork;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Param;
31+
import org.openjdk.jmh.annotations.Scope;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.annotations.Warmup;
34+
import org.openjdk.jmh.runner.Runner;
35+
import org.openjdk.jmh.runner.options.Options;
36+
import org.openjdk.jmh.runner.options.OptionsBuilder;
37+
38+
/**
39+
* Benchmark cache with interceptor queries.
40+
*/
41+
@Fork(1)
42+
@BenchmarkMode(Mode.Throughput)
43+
@OutputTimeUnit(TimeUnit.SECONDS)
44+
@Warmup(iterations = 5, time = 5)
45+
@Measurement(iterations = 10, time = 5)
46+
@State(Scope.Benchmark)
47+
public class JmhCacheWithInterceptorBenchmark extends JmhSqlAbstractBenchmark {
48+
/** Query engine. */
49+
@Param({"CALCITE"})
50+
protected String engine;
51+
52+
/** Keep binary mode. */
53+
@Param({"true", "false"})
54+
protected boolean keepBinary;
55+
56+
/** {@inheritDoc} */
57+
@Override protected CacheConfiguration<Integer, Item> cacheConfiguration() {
58+
return super.cacheConfiguration().setInterceptor(new CacheInterceptorAdapter<>());
59+
}
60+
61+
/** Test update operation. */
62+
@Benchmark
63+
public void update() {
64+
int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
65+
66+
executeSql("UPDATE CACHE.Item SET fld = fld + 1 WHERE fldIdx=?", key);
67+
}
68+
69+
/**
70+
* Run benchmarks.
71+
*
72+
* @param args Args.
73+
* @throws Exception Exception.
74+
*/
75+
public static void main(String[] args) throws Exception {
76+
final Options options = new OptionsBuilder()
77+
.include(JmhCacheWithInterceptorBenchmark.class.getSimpleName())
78+
.build();
79+
80+
new Runner(options).run();
81+
}
82+
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
145145
/** */
146146
private Object[] correlations = new Object[16];
147147

148+
/** Entries holder per execution thread. */
149+
private static final ThreadLocal<Collection<QueryTxEntry>> txEntriesHolder = new ThreadLocal<>();
150+
148151
/**
149152
* @param qctx Parent base query context.
150153
* @param qryId Query ID.
@@ -182,7 +185,7 @@ public ExecutionContext(
182185
this.ioTracker = ioTracker;
183186
this.params = params;
184187
this.timeout = timeout;
185-
this.qryTxEntries = qryTxEntries;
188+
this.qryTxEntries = qryTxEntries == null ? txEntriesHolder.get() : qryTxEntries;
186189

187190
startTs = U.currentTimeMillis();
188191

@@ -421,12 +424,17 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
421424

422425
executor.execute(qryId, fragmentId(), () -> {
423426
try {
427+
txEntriesHolder.set(qryTxEntries);
428+
424429
if (!isCancelled())
425430
task.run();
426431
}
427432
catch (Throwable e) {
428433
onError.accept(e);
429434
}
435+
finally {
436+
txEntriesHolder.remove();
437+
}
430438
});
431439
}
432440

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,9 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
672672
try {
673673
SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);
674674

675+
QueryProperties props = qry.context().unwrap(QueryProperties.class);
676+
boolean keepBinaryMode = props == null || props.keepBinary();
677+
675678
QueryStartRequest req = new QueryStartRequest(
676679
qry.id(),
677680
qry.localQueryId(),
@@ -684,7 +687,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
684687
parametersMarshalled,
685688
timeout,
686689
ectx.getQryTxEntries(),
687-
sesCtx == null ? null : sesCtx.attributes()
690+
sesCtx == null ? null : sesCtx.attributes(),
691+
keepBinaryMode
688692
);
689693

690694
messageService().send(nodeId, req);
@@ -881,8 +885,13 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
881885
)
882886
);
883887

888+
boolean keepBinaryMode = msg.keepBinaryMode();
889+
QueryProperties qryProps = new QueryProperties(null, keepBinaryMode, false);
890+
884891
final BaseQueryContext qctx = createQueryContext(
885-
msg.applicationAttributes() == null ? Contexts.empty() : Contexts.of(new SessionContextImpl(msg.applicationAttributes())),
892+
msg.applicationAttributes() == null ?
893+
Contexts.of(qryProps) :
894+
Contexts.of(new SessionContextImpl(msg.applicationAttributes()), qryProps),
886895
msg.schema());
887896

888897
FragmentPlan fragmentPlan = fragmentPlanCache.computeIfAbsent(msg.root(), k -> prepareFragment(qctx, k));
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query.calcite.exec;
19+
20+
import java.util.Map;
21+
import org.apache.calcite.util.ImmutableBitSet;
22+
import org.apache.calcite.util.ImmutableIntList;
23+
import org.apache.ignite.IgniteException;
24+
import org.apache.ignite.binary.BinaryObject;
25+
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
26+
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
27+
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
28+
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
29+
import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler;
30+
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
31+
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey;
32+
import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory;
33+
import org.apache.ignite.internal.processors.query.QueryUtils;
34+
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
35+
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
36+
import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
37+
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
38+
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
39+
import org.jetbrains.annotations.Nullable;
40+
41+
/** Extension for column {@value QueryUtils#KEY_FIELD_NAME} in case of composite primary key. */
42+
public class IndexWrappedKeyScan<Row> extends IndexScan<Row> {
43+
/** */
44+
public IndexWrappedKeyScan(
45+
ExecutionContext<Row> ectx,
46+
CacheTableDescriptor desc,
47+
InlineIndex idx,
48+
ImmutableIntList idxFieldMapping,
49+
int[] parts,
50+
RangeIterable<Row> ranges,
51+
@Nullable ImmutableBitSet requiredColumns
52+
) {
53+
super(ectx, desc, idx, idxFieldMapping, parts, ranges, requiredColumns);
54+
}
55+
56+
/** */
57+
@Override protected IndexRow row2indexRow(Row bound) {
58+
if (bound == null)
59+
return null;
60+
61+
RowHandler<Row> rowHnd = ectx.rowHandler();
62+
63+
Object key = rowHnd.get(QueryUtils.KEY_COL, bound);
64+
assert key != null : String.format("idxName=%s, bound=%s", idx.name(), Commons.toString(rowHnd, bound));
65+
66+
if (key instanceof BinaryObject)
67+
return binaryObject2indexRow((BinaryObject)key);
68+
69+
throw new IgniteException(String.format(
70+
"Unsupported type for index boundary: [expected=%s, current=%s]",
71+
BinaryObject.class.getName(), key.getClass().getName()
72+
));
73+
}
74+
75+
/** */
76+
private IndexRow binaryObject2indexRow(BinaryObject o) {
77+
assert o.type().typeName().equals(idx.indexDefinition().typeDescriptor().keyTypeName()) : String.format(
78+
"idx=%s, o=%s, oType=%s, idxKeyType=%s",
79+
idx.name(), o, o.type().typeName(), idx.indexDefinition().typeDescriptor().keyTypeName()
80+
);
81+
82+
InlineIndexRowHandler idxRowHnd = idx.segment(0).rowHandler();
83+
IndexKey[] keys = new IndexKey[idx.indexDefinition().indexKeyDefinitions().size()];
84+
85+
int i = 0;
86+
for (Map.Entry<String, IndexKeyDefinition> e : idx.indexDefinition().indexKeyDefinitions().entrySet()) {
87+
String keyName = e.getKey();
88+
89+
ColumnDescriptor fieldDesc = desc.columnDescriptor(keyName);
90+
assert fieldDesc != null : String.format("idx=%s, o=%s, keyName=%s", idx.name(), o, keyName);
91+
92+
Object field = o.field(keyName);
93+
Object key = TypeUtils.fromInternal(ectx, field, fieldDesc.storageType());
94+
95+
keys[i++] = wrapIndexKey(key, e.getValue().indexKeyType());
96+
}
97+
98+
return new IndexPlainRowImpl(keys, idxRowHnd);
99+
}
100+
101+
/** */
102+
private IndexKey wrapIndexKey(Object key, IndexKeyType keyType) {
103+
return IndexKeyFactory.wrap(key, keyType, cctx.cacheObjectContext(), idx.indexDefinition().keyTypeSettings());
104+
}
105+
}

0 commit comments

Comments
 (0)