Skip to content

Commit fba9935

Browse files
timoninmaximzstan
authored andcommitted
IGNITE-27696 Fix IndexQuery can query backup partition if setPartition is used (#12679)
(cherry picked from commit 4e2e691)
1 parent a0a665d commit fba9935

4 files changed

Lines changed: 28 additions & 23 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import org.apache.ignite.internal.util.lang.GridIterator;
6363
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
6464
import org.apache.ignite.internal.util.typedef.F;
65-
import org.apache.ignite.internal.util.typedef.P1;
6665
import org.apache.ignite.internal.util.typedef.T2;
6766
import org.apache.ignite.internal.util.typedef.X;
6867
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -840,9 +839,12 @@ private Collection<ClusterNode> nodes() throws IgniteCheckedException {
840839
}
841840

842841
/**
842+
* Collects query data nodes matching specified {@code prj} and {@code part}.
843+
*
843844
* @param cctx Cache context.
844845
* @param prj Projection (optional).
845-
* @return Collection of data nodes in provided projection (if any).
846+
* @param part Partition (optional).
847+
* @return Collection of data nodes matching specified {@code prj} and {@code part}.
846848
* @throws IgniteCheckedException If partition number is invalid.
847849
*/
848850
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
@@ -856,19 +858,14 @@ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
856858
if (prj == null && part == null)
857859
return affNodes;
858860

859-
if (part != null && part >= cctx.affinity().partitions())
860-
throw new IgniteCheckedException("Invalid partition number: " + part);
861+
if (part != null) {
862+
if (part >= cctx.affinity().partitions())
863+
throw new IgniteCheckedException("Invalid partition number: " + part);
861864

862-
final Set<ClusterNode> owners =
863-
part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
865+
affNodes = cctx.topology().nodes(part, topVer);
866+
}
864867

865-
return F.view(affNodes, new P1<ClusterNode>() {
866-
@Override public boolean apply(ClusterNode n) {
867-
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
868-
(prj == null || prj.node(n.id()) != null) &&
869-
(part == null || owners.contains(n));
870-
}
871-
});
868+
return prj == null ? affNodes : F.view(affNodes, n -> prj.node(n.id()) != null);
872869
}
873870

874871
/** */

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ThreadLocalRandom;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicInteger;
3031
import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +39,7 @@
3839
import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
3940
import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
4041
import org.apache.ignite.internal.util.lang.GridPlainCallable;
42+
import org.apache.ignite.internal.util.typedef.F;
4143
import org.apache.ignite.internal.util.typedef.internal.U;
4244

4345
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
@@ -92,7 +94,7 @@ protected GridCacheDistributedQueryFuture(
9294
qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();
9395

9496
if (qry.query().partition() != null)
95-
nodes = Collections.singletonList(node(nodes));
97+
nodes = Collections.singletonList(cctx.isReplicated() ? localOrRemoteNode(nodes) : F.first(nodes));
9698

9799
streams = new ConcurrentHashMap<>(nodes.size());
98100

@@ -118,18 +120,23 @@ protected GridCacheDistributedQueryFuture(
118120
}
119121

120122
/**
121-
* @return Nodes for query execution.
123+
* @return A local node if available, otherwise a random node from the given collection.
122124
*/
123-
private ClusterNode node(Collection<ClusterNode> nodes) {
125+
private ClusterNode localOrRemoteNode(Collection<ClusterNode> nodes) {
126+
int remoteNodeIdx = ThreadLocalRandom.current().nextInt(nodes.size());
127+
124128
ClusterNode rmtNode = null;
125129

126130
for (ClusterNode node : nodes) {
127131
if (node.isLocal())
128132
return node;
129133

130-
rmtNode = node;
134+
if (remoteNodeIdx-- == 0)
135+
rmtNode = node;
131136
}
132137

138+
assert rmtNode != null;
139+
133140
return rmtNode;
134141
}
135142

@@ -282,7 +289,7 @@ private void cancelPages(UUID nodeId) {
282289
GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());
283290

284291
if (nodeId.equals(cctx.localNodeId())) {
285-
// Process cancel query directly (without sending) for local node,
292+
// Process cancel query directly (without sending) for local node.
286293
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
287294
@Override public Object call() {
288295
qryMgr.processQueryRequest(cctx.localNodeId(), req);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,11 +1121,11 @@ public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws Ignite
11211121
int[] parts = null;
11221122

11231123
if (part != null) {
1124-
final GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(part);
1124+
AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
11251125

1126-
if (locPart == null || locPart.state() != OWNING) {
1126+
if (cctx.isPartitioned() && !cctx.affinity().primaryByPartition(cctx.localNode(), part, topVer)) {
11271127
throw new CacheInvalidStateException("Failed to execute index query because required partition " +
1128-
"has not been found on local node [cacheName=" + cctx.name() + ", part=" + part + "]");
1128+
"is not primary on local node [cacheName=" + cctx.name() + ", part=" + part + ", topVer=" + topVer + ']');
11291129
}
11301130

11311131
parts = new int[] {part};

modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public static List<Object[]> params() {
7878
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
7979
.setCacheMode(cacheMode)
8080
.setIndexedTypes(Integer.class, Person.class)
81+
.setBackups(1)
8182
.setAffinity(new RendezvousAffinityFunction().setPartitions(100));
8283

8384
cfg.setCacheConfiguration(ccfg);
@@ -136,7 +137,7 @@ public void testSinglePartition() {
136137
}
137138
}
138139

139-
assertEquals(sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
140+
assertEquals("part=" + part, sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
140141
}
141142
}
142143

@@ -181,7 +182,7 @@ public void testLocalWithPartition() {
181182
GridTestUtils.assertThrows(null, () -> grid().cache("CACHE").query(qry).getAll(),
182183
client ? IgniteException.class : CacheInvalidStateException.class,
183184
client ? "Failed to execute local index query on a client node." :
184-
"Failed to execute index query because required partition has not been found on local node");
185+
"Failed to execute index query because required partition is not primary on local node");
185186
}
186187
else
187188
assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());

0 commit comments

Comments
 (0)