Skip to content

Commit fdd19e1

Browse files
petrov-mgNSAmelchev
authored andcommitted
IGNITE-23958 Fixed security context propagation for async transactional operations (#11732)
(cherry picked from commit d547038)
1 parent 34b21f1 commit fdd19e1

5 files changed

Lines changed: 276 additions & 12 deletions

File tree

modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_CACHED_STRINGS_THRESHOLD;
133133
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE;
134134
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE;
135+
import static org.apache.ignite.internal.processors.platform.client.ClientRequestHandler.DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS;
135136
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_INDEXING_DISCOVERY_HISTORY_SIZE;
136137
import static org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker.DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE;
137138
import static org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TIMEOUT;
@@ -1319,6 +1320,17 @@ public final class IgniteSystemProperties {
13191320
defaults = "" + DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT)
13201321
public static final String IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT = "IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT";
13211322

1323+
/**
1324+
* Timeout in milliseconds that determines how long Ignite will synchronously wait for asynchronous thin client
1325+
* requests to complete before releasing the thread.
1326+
*/
1327+
@SystemProperty(
1328+
value = "Timeout in milliseconds that determines how long Ignite will synchronously wait for" +
1329+
" asynchronous thin client requests to complete before releasing the thread",
1330+
type = Long.class,
1331+
defaults = "" + DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS)
1332+
public static final String IGNITE_THIN_CLIENT_ASYNC_REQUESTS_WAIT_TIMEOUT = "IGNITE_THIN_CLIENT_ASYNC_REQUESTS_WAIT_TIMEOUT";
1333+
13221334
/**
13231335
* Default value is {@code false}.
13241336
*

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
111111
import org.apache.ignite.internal.processors.performancestatistics.OperationType;
112112
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
113+
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
114+
import org.apache.ignite.internal.processors.security.SecurityContext;
113115
import org.apache.ignite.internal.processors.task.GridInternal;
114116
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
115117
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -3850,24 +3852,30 @@ protected <T> IgniteInternalFuture<T> asyncOp(
38503852
};
38513853

38523854
if (fut != null && !fut.isDone()) {
3853-
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
3854-
(IgniteOutClosure<IgniteInternalFuture>)() -> {
3855-
GridFutureAdapter resFut = new GridFutureAdapter();
3855+
SecurityContext secCtx = ctx.kernalContext().security().securityContext();
3856+
3857+
IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(
3858+
fut,
3859+
(IgniteOutClosure<IgniteInternalFuture<T>>)() -> {
3860+
GridFutureAdapter<T> resFut = new GridFutureAdapter<>();
38563861

38573862
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> {
3858-
IgniteInternalFuture fut0;
3863+
IgniteInternalFuture<T> opFut;
38593864

38603865
if (ctx.kernalContext().isStopping())
3861-
fut0 = new GridFinishedFuture<>(
3866+
opFut = new GridFinishedFuture<>(
38623867
new IgniteCheckedException("Operation has been cancelled (node or cache is stopping)."));
38633868
else if (ctx.gate().isStopped())
3864-
fut0 = new GridFinishedFuture<>(new CacheStoppedException(ctx.name()));
3869+
opFut = new GridFinishedFuture<>(new CacheStoppedException(ctx.name()));
38653870
else {
38663871
ctx.operationContextPerCall(opCtx);
38673872
ctx.shared().txContextReset();
38683873

3869-
try {
3870-
fut0 = op.op(tx0).chain(clo);
3874+
try (OperationSecurityContext ignored = ctx.kernalContext().security().withContext(secCtx)) {
3875+
opFut = op.op(tx0).chain(clo);
3876+
}
3877+
catch (Throwable e) {
3878+
opFut = new GridFinishedFuture<>(e);
38713879
}
38723880
finally {
38733881
// It is necessary to clear tx context in this thread as well.
@@ -3876,9 +3884,9 @@ else if (ctx.gate().isStopped())
38763884
}
38773885
}
38783886

3879-
fut0.listen(() -> {
3887+
opFut.listen(lsnrFut -> {
38803888
try {
3881-
resFut.onDone(fut0.get());
3889+
resFut.onDone(lsnrFut.get());
38823890
}
38833891
catch (Throwable ex) {
38843892
resFut.onDone(ex);

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.ignite.IgniteCheckedException;
2121
import org.apache.ignite.IgniteIllegalStateException;
2222
import org.apache.ignite.IgniteLogger;
23+
import org.apache.ignite.IgniteSystemProperties;
2324
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
2425
import org.apache.ignite.internal.IgniteInternalFuture;
2526
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
@@ -39,6 +40,7 @@
3940
import org.apache.ignite.internal.util.typedef.internal.U;
4041
import org.apache.ignite.plugin.security.SecurityException;
4142

43+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THIN_CLIENT_ASYNC_REQUESTS_WAIT_TIMEOUT;
4244
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.BITMAP_FEATURES;
4345
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.PARTITION_AWARENESS;
4446

@@ -47,7 +49,13 @@
4749
*/
4850
public class ClientRequestHandler implements ClientListenerRequestHandler {
4951
/** Timeout to wait for async requests completion, to handle them as regular sync requests. */
50-
private static final long ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS = 10L;
52+
public static final long DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS = 10L;
53+
54+
/** */
55+
private final long asyncReqWaitTimeout = IgniteSystemProperties.getLong(
56+
IGNITE_THIN_CLIENT_ASYNC_REQUESTS_WAIT_TIMEOUT,
57+
DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS
58+
);
5159

5260
/** Client context. */
5361
private final ClientConnectionContext ctx;
@@ -122,10 +130,13 @@ private ClientListenerResponse handle0(ClientListenerRequest req) {
122130
if (req0.isAsync(ctx)) {
123131
IgniteInternalFuture<ClientResponse> fut = req0.processAsync(ctx);
124132

133+
if (asyncReqWaitTimeout <= 0)
134+
return new ClientAsyncResponse(req0.requestId(), fut);
135+
125136
try {
126137
// Give request a chance to be executed and response processed by the current thread,
127138
// so we can avoid any performance drops caused by async requests execution.
128-
return fut.get(ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS);
139+
return fut.get(asyncReqWaitTimeout);
129140
}
130141
catch (IgniteFutureTimeoutCheckedException ignored) {
131142
return new ClientAsyncResponse(req0.requestId(), fut);
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.security;
19+
20+
import java.security.Permissions;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.function.Function;
27+
import com.google.common.collect.ImmutableSet;
28+
import org.apache.ignite.Ignition;
29+
import org.apache.ignite.client.ClientAuthorizationException;
30+
import org.apache.ignite.client.ClientCache;
31+
import org.apache.ignite.client.ClientException;
32+
import org.apache.ignite.client.IgniteClient;
33+
import org.apache.ignite.client.IgniteClientFuture;
34+
import org.apache.ignite.configuration.CacheConfiguration;
35+
import org.apache.ignite.configuration.ClientConfiguration;
36+
import org.apache.ignite.configuration.ClientConnectorConfiguration;
37+
import org.apache.ignite.configuration.IgniteConfiguration;
38+
import org.apache.ignite.internal.IgniteEx;
39+
import org.apache.ignite.internal.IgniteInternalFuture;
40+
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
41+
import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
42+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
43+
import org.apache.ignite.internal.processors.security.impl.TestSecurityData;
44+
import org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider;
45+
import org.apache.ignite.internal.util.typedef.X;
46+
import org.apache.ignite.plugin.security.SecurityPermissionSet;
47+
import org.apache.ignite.testframework.GridTestUtils;
48+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
49+
import org.junit.Test;
50+
import org.junit.runner.RunWith;
51+
import org.junit.runners.Parameterized;
52+
53+
import static java.util.Collections.singletonMap;
54+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
55+
import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
56+
import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_CLUSTER_STATE;
57+
import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_CREATE;
58+
import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_PUT;
59+
import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_READ;
60+
import static org.apache.ignite.plugin.security.SecurityPermission.CACHE_REMOVE;
61+
import static org.apache.ignite.plugin.security.SecurityPermission.JOIN_AS_SERVER;
62+
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS;
63+
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create;
64+
65+
/** */
66+
@RunWith(Parameterized.class)
67+
public class SecurityContextInternalFuturePropagationTest extends GridCommonAbstractTest {
68+
/** */
69+
private static final int PRELOADED_KEY_CNT = 100;
70+
71+
/** */
72+
private static final AtomicInteger KEY_CNTR = new AtomicInteger();
73+
74+
/** */
75+
@Parameterized.Parameter()
76+
public Function<ClientCache<Object, Object>, IgniteClientFuture<?>> op;
77+
78+
/** */
79+
@Parameterized.Parameters()
80+
public static List<Function<ClientCache<Object, Object>, IgniteClientFuture<?>>> data() {
81+
return Arrays.asList(
82+
cache -> cache.getAllAsync(ImmutableSet.of(nextKey(), nextKey())), // 0
83+
cache -> cache.getAndPutAsync(nextKey(), 0), // 1
84+
cache -> cache.getAndPutIfAbsentAsync(nextKey(), 0), // 2
85+
cache -> cache.getAndPutIfAbsentAsync(PRELOADED_KEY_CNT + nextKey(), 0), // 3
86+
cache -> cache.getAndRemoveAsync(nextKey()), // 4
87+
cache -> cache.getAndReplaceAsync(nextKey(), 0), // 5
88+
cache -> cache.getAsync(nextKey()), // 6
89+
cache -> cache.putAllAsync(new HashMap<>() {{ put(nextKey(), 0); put(nextKey(), 0); }}), // 7
90+
cache -> cache.putAsync(nextKey(), 0), // 8
91+
cache -> cache.putIfAbsentAsync(PRELOADED_KEY_CNT + nextKey(), 0), // 9
92+
cache -> cache.removeAsync(nextKey()), // 10
93+
cache -> {
94+
int key = nextKey();
95+
return cache.removeAsync(key, key);
96+
}, // 11
97+
cache -> cache.replaceAsync(nextKey(), 0), // 12
98+
cache -> {
99+
int key = nextKey();
100+
101+
return cache.replaceAsync(key, key, 0);
102+
} // 13
103+
);
104+
}
105+
106+
/** {@inheritDoc} */
107+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
108+
return super.getConfiguration(igniteInstanceName)
109+
.setCommunicationSpi(new TestRecordingCommunicationSpi())
110+
.setUserAttributes(singletonMap(IDX_ATTR, getTestIgniteInstanceIndex(igniteInstanceName)))
111+
.setClientConnectorConfiguration(new ClientConnectorConfiguration()
112+
.setThreadPoolSize(1))
113+
.setPluginProviders(new TestSecurityPluginProvider(
114+
igniteInstanceName,
115+
"",
116+
create()
117+
.defaultAllowAll(false)
118+
.appendSystemPermissions(JOIN_AS_SERVER, ADMIN_CLUSTER_STATE)
119+
.appendCachePermissions(DEFAULT_CACHE_NAME, CACHE_CREATE)
120+
.build(),
121+
null,
122+
false,
123+
userData("forbidden_client", NO_PERMISSIONS),
124+
userData("allowed_client", create()
125+
.defaultAllowAll(false)
126+
.appendCachePermissions(DEFAULT_CACHE_NAME, CACHE_READ, CACHE_PUT, CACHE_REMOVE)
127+
.build())
128+
));
129+
}
130+
131+
/** {@inheritDoc} */
132+
@Override protected void afterTest() throws Exception {
133+
super.afterTest();
134+
135+
stopAllGrids();
136+
}
137+
138+
/** */
139+
@Test
140+
public void testSecurityContextInternalFuturePropagation() throws Exception {
141+
IgniteEx ignite = startGrids(2);
142+
143+
prepareCache(ignite);
144+
145+
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(1));
146+
147+
spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite.name());
148+
149+
IgniteInternalFuture<IgniteEx> joinFut = GridTestUtils.runAsync(() -> startGrid(2));
150+
151+
spi.waitForBlocked();
152+
153+
try (
154+
IgniteClient allowedCli = startClient("allowed_client");
155+
IgniteClient forbiddenCli = startClient("forbidden_client")
156+
) {
157+
ClientCache<Object, Object> allowedCliCache = allowedCli.cache(DEFAULT_CACHE_NAME);
158+
ClientCache<Object, Object> forbiddenCliCache = forbiddenCli.cache(DEFAULT_CACHE_NAME);
159+
160+
IgniteClientFuture<?> op0Fut = op.apply(allowedCliCache);
161+
162+
// The following operation previously was executed with security context associated with the joining node user.
163+
IgniteClientFuture<?> op1Fut = op.apply(allowedCliCache);
164+
165+
// Simulates failure of a chained operation.
166+
IgniteClientFuture<?> op2Fut = op.apply(forbiddenCliCache);
167+
168+
// The failure of one operation in a chain should not leave the entire chain in a broken state.
169+
IgniteClientFuture<?> op3Fut = op.apply(allowedCliCache);
170+
171+
spi.stopBlock();
172+
173+
joinFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
174+
175+
op0Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
176+
op1Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
177+
178+
try {
179+
op2Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
180+
181+
fail();
182+
}
183+
catch (Exception e) {
184+
assertTrue(X.hasCause(e, "Authorization failed", ClientException.class)
185+
|| X.hasCause(e, "User is not authorized to perform this operation", ClientAuthorizationException.class));
186+
}
187+
188+
op3Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
189+
}
190+
}
191+
192+
/** */
193+
private void prepareCache(IgniteEx ignite) throws Exception {
194+
ignite.createCache(new CacheConfiguration<>()
195+
.setName(DEFAULT_CACHE_NAME)
196+
.setAtomicityMode(TRANSACTIONAL)
197+
.setReadFromBackup(false)
198+
.setBackups(1)
199+
.setAffinity(new GridCacheModuloAffinityFunction(2, 1)));
200+
201+
awaitPartitionMapExchange();
202+
203+
try (IgniteClient cli = startClient("allowed_client")) {
204+
for (int i = 0; i < PRELOADED_KEY_CNT; i++)
205+
cli.cache(DEFAULT_CACHE_NAME).put(i, i);
206+
}
207+
}
208+
209+
/** */
210+
private static int nextKey() {
211+
return KEY_CNTR.incrementAndGet();
212+
}
213+
214+
/** */
215+
private static IgniteClient startClient(String login) {
216+
return Ignition.startClient(new ClientConfiguration()
217+
.setAddresses("127.0.0.1:10800")
218+
.setUserName(login)
219+
.setUserPassword(""));
220+
}
221+
222+
/** */
223+
private static TestSecurityData userData(String login, SecurityPermissionSet perms) {
224+
return new TestSecurityData(
225+
login,
226+
"",
227+
perms,
228+
new Permissions()
229+
);
230+
}
231+
}

modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.ignite.internal.processors.security.IgniteSecurityProcessorTest;
2121
import org.apache.ignite.internal.processors.security.InvalidServerTest;
2222
import org.apache.ignite.internal.processors.security.NodeSecurityContextPropagationTest;
23+
import org.apache.ignite.internal.processors.security.SecurityContextInternalFuturePropagationTest;
2324
import org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCheckTest;
2425
import org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCreateDestroyCheckTest;
2526
import org.apache.ignite.internal.processors.security.cache.ContinuousQueryPermissionCheckTest;
@@ -143,6 +144,7 @@
143144
NodeSecurityContextPropagationTest.class,
144145
NodeJoinPermissionsTest.class,
145146
ActivationOnJoinWithoutPermissionsWithPersistenceTest.class,
147+
SecurityContextInternalFuturePropagationTest.class,
146148
})
147149
public class SecurityTestSuite {
148150
/** */

0 commit comments

Comments
 (0)