Skip to content

Commit a535985

Browse files
timoninmaximzstan
authored andcommitted
IGNITE-20973 Add handshake timeout configuration for java thin client (#12710)
1 parent 7e15f04 commit a535985

10 files changed

Lines changed: 196 additions & 27 deletions

File tree

modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.ignite.client.SslMode;
3939
import org.apache.ignite.client.SslProtocol;
4040
import org.apache.ignite.internal.client.thin.TcpIgniteClient;
41+
import org.apache.ignite.internal.util.typedef.internal.LT;
4142
import org.apache.ignite.internal.util.typedef.internal.S;
4243

4344
/**
@@ -57,8 +58,11 @@ public final class ClientConfiguration implements Serializable {
5758
/** @serial Tcp no delay. */
5859
private boolean tcpNoDelay = true;
5960

60-
/** @serial Timeout. 0 means infinite. */
61-
private int timeout;
61+
/** @serial Handshake timeout in milliseconds. 0 means infinite. */
62+
private int handshakeTimeout;
63+
64+
/** @serial Request timeout in milliseconds. 0 means infinite. */
65+
private int reqTimeout;
6266

6367
/** @serial Send buffer size. 0 means system default. */
6468
private int sndBufSize = 32 * 1024;
@@ -227,18 +231,65 @@ public ClientConfiguration setTcpNoDelay(boolean tcpNoDelay) {
227231
}
228232

229233
/**
230-
* @return Send/receive timeout in milliseconds.
234+
* @deprecated Use {@link #getHandshakeTimeout()} and {@link #getRequestTimeout()} instead.
235+
* @return Request timeout in milliseconds.
231236
*/
237+
@Deprecated
232238
public int getTimeout() {
233-
return timeout;
239+
if (reqTimeout != handshakeTimeout) {
240+
LT.warn(logger, String.format(
241+
"Deprecated getTimeout() API is used while request timeout (%d) differs from handshake timeout (%d). " +
242+
"Returning request timeout. Please use getRequestTimeout() and getHandshakeTimeout() instead.",
243+
reqTimeout, handshakeTimeout
244+
));
245+
}
246+
247+
return reqTimeout;
234248
}
235249

236250
/**
251+
* @deprecated Use {@link #setHandshakeTimeout(int)} and {@link #setRequestTimeout(int)} instead.
237252
* @param timeout Send/receive timeout in milliseconds.
238253
* @return {@code this} for chaining.
239254
*/
255+
@Deprecated
240256
public ClientConfiguration setTimeout(int timeout) {
241-
this.timeout = timeout;
257+
handshakeTimeout = timeout;
258+
reqTimeout = timeout;
259+
260+
return this;
261+
}
262+
263+
/**
264+
* @return Handshake timeout in milliseconds. 0 means infinite.
265+
*/
266+
public int getHandshakeTimeout() {
267+
return handshakeTimeout;
268+
}
269+
270+
/**
271+
* @param handshakeTimeout Handshake timeout in milliseconds. 0 means infinite.
272+
* @return {@code this} for chaining.
273+
*/
274+
public ClientConfiguration setHandshakeTimeout(int handshakeTimeout) {
275+
this.handshakeTimeout = handshakeTimeout;
276+
277+
return this;
278+
}
279+
280+
/**
281+
* @return Request timeout in milliseconds. 0 means infinite.
282+
*/
283+
public int getRequestTimeout() {
284+
return reqTimeout;
285+
}
286+
287+
/**
288+
* @param reqTimeout Request timeout in milliseconds. 0 means infinite.
289+
* @return {@code this} for chaining.
290+
*/
291+
public ClientConfiguration setRequestTimeout(int reqTimeout) {
292+
this.reqTimeout = reqTimeout;
242293

243294
return this;
244295
}

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ final class ClientChannelConfiguration {
4444
/** Tcp no delay. */
4545
private final boolean tcpNoDelay;
4646

47-
/** Timeout. */
48-
private final int timeout;
47+
/** Handshake timeout. */
48+
private final int handshakeTimeout;
49+
50+
/** Request timeout. */
51+
private final int reqTimeout;
4952

5053
/** Send buffer size. */
5154
private final int sndBufSize;
@@ -123,7 +126,8 @@ final class ClientChannelConfiguration {
123126
ClientChannelConfiguration(ClientConfiguration cfg, List<InetSocketAddress> addrs) {
124127
this.sslMode = cfg.getSslMode();
125128
this.tcpNoDelay = cfg.isTcpNoDelay();
126-
this.timeout = cfg.getTimeout();
129+
this.handshakeTimeout = cfg.getHandshakeTimeout();
130+
this.reqTimeout = cfg.getRequestTimeout();
127131
this.sndBufSize = cfg.getSendBufferSize();
128132
this.rcvBufSize = cfg.getReceiveBufferSize();
129133
this.sslClientCertKeyStorePath = cfg.getSslClientCertificateKeyStorePath();
@@ -172,10 +176,17 @@ public boolean isTcpNoDelay() {
172176
}
173177

174178
/**
175-
* @return Timeout.
179+
* @return Handshake timeout.
180+
*/
181+
public int getHandshakeTimeout() {
182+
return handshakeTimeout;
183+
}
184+
185+
/**
186+
* @return Request timeout.
176187
*/
177-
public int getTimeout() {
178-
return timeout;
188+
public int getRequestTimeout() {
189+
return reqTimeout;
179190
}
180191

181192
/**

modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
160160
/** Executor for async operation listeners. */
161161
private final Executor asyncContinuationExecutor;
162162

163-
/** Send/receive timeout in milliseconds. */
164-
private final int timeout;
163+
/** Handshake timeout in milliseconds. */
164+
private final int handshakeTimeout;
165+
166+
/** Request timeout in milliseconds. */
167+
private final int reqTimeout;
165168

166169
/** Heartbeat timer. */
167170
private final Timer heartbeatTimer;
@@ -195,7 +198,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
195198
Executor cfgExec = cfg.getAsyncContinuationExecutor();
196199
asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool();
197200

198-
timeout = cfg.getTimeout();
201+
handshakeTimeout = cfg.getHandshakeTimeout();
202+
reqTimeout = cfg.getRequestTimeout();
199203

200204
List<InetSocketAddress> addrs = cfg.getAddresses();
201205

@@ -419,7 +423,7 @@ private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChann
419423
long startTimeNanos = pendingReq.startTimeNanos;
420424

421425
try {
422-
ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
426+
ByteBuffer payload = reqTimeout > 0 ? pendingReq.get(reqTimeout) : pendingReq.get();
423427

424428
T res = null;
425429
if (payload != null && payloadReader != null)
@@ -713,9 +717,6 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String,
713717
long reqId = -1L;
714718
long startTime = System.nanoTime();
715719

716-
eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
717-
new ProtocolContext(ver).toString(), null));
718-
719720
while (true) {
720721
ClientRequestFuture fut;
721722

@@ -733,10 +734,13 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String,
733734
pendingReqsLock.readLock().unlock();
734735
}
735736

737+
eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
738+
new ProtocolContext(ver).toString(), null));
739+
736740
handshakeReq(ver, user, pwd, userAttrs);
737741

738742
try {
739-
ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get();
743+
ByteBuffer buf = handshakeTimeout > 0 ? fut.get(handshakeTimeout) : fut.get();
740744

741745
BinaryInputStream res = BinaryStreams.inputStream(buf);
742746

modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
8989
else
9090
filters = new GridNioFilter[] {codecFilter};
9191

92-
connTimeout = cfg.getTimeout();
92+
connTimeout = cfg.getHandshakeTimeout();
9393

9494
try {
9595
srv = GridNioServer.<ByteBuffer>builder()

modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public class ClientConfigurationTest {
5959
public void testSerialization() throws IOException, ClassNotFoundException {
6060
ClientConfiguration target = new ClientConfiguration()
6161
.setAddresses("127.0.0.1:10800", "127.0.0.1:10801")
62-
.setTimeout(123)
62+
.setHandshakeTimeout(123)
63+
.setRequestTimeout(123)
6364
.setBinaryConfiguration(new BinaryConfiguration()
6465
.setClassNames(Collections.singleton("Person"))
6566
)

modules/core/src/test/java/org/apache/ignite/client/Comparers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public static boolean equal(ClientConfiguration a, Object o) {
3636

3737
return Arrays.equals(a.getAddresses(), b.getAddresses()) &&
3838
a.isTcpNoDelay() == b.isTcpNoDelay() &&
39-
a.getTimeout() == b.getTimeout() &&
39+
a.getHandshakeTimeout() == b.getHandshakeTimeout() &&
40+
a.getRequestTimeout() == b.getRequestTimeout() &&
4041
a.getSendBufferSize() == b.getSendBufferSize() &&
4142
a.getReceiveBufferSize() == b.getReceiveBufferSize();
4243
}

modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() throws Ex
158158
// Config has good server address, client discovery returns unreachable address.
159159
// We expect the client to connect to the good address and ignore the unreachable one.
160160
ClientConfiguration ccfg = new ClientConfiguration()
161-
.setTimeout(2000)
161+
.setHandshakeTimeout(2000)
162+
.setRequestTimeout(2000)
162163
.setAddresses("127.0.0.1:" + DFLT_PORT);
163164

164165
IgniteClient client = Ignition.startClient(ccfg);

modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.ignite.configuration.ClientConfiguration;
4040
import org.apache.ignite.configuration.ClientConnectorConfiguration;
4141
import org.apache.ignite.configuration.IgniteConfiguration;
42+
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
4243
import org.apache.ignite.internal.IgniteInternalFuture;
4344
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
4445
import org.apache.ignite.internal.binary.streams.BinaryStreams;
@@ -67,7 +68,7 @@ public class TimeoutTest extends AbstractThinClientTest {
6768

6869
/** {@inheritDoc} */
6970
@Override protected ClientConfiguration getClientConfiguration() {
70-
return super.getClientConfiguration().setTimeout(TIMEOUT);
71+
return super.getClientConfiguration().setHandshakeTimeout(TIMEOUT).setRequestTimeout(TIMEOUT);
7172
}
7273

7374
/**
@@ -217,4 +218,103 @@ public void testClientTimeoutOnOperation() throws Exception {
217218
}
218219
}
219220
}
221+
222+
/**
223+
* Test that connection timeout is independent of request timeout during connection establishment.
224+
*/
225+
@Test
226+
@SuppressWarnings("ThrowableNotThrown")
227+
public void testConnectionTimeoutIndependentOfRequest() throws Exception {
228+
ServerSocket sock = new ServerSocket();
229+
sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT));
230+
231+
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
232+
Socket accepted = null;
233+
234+
try {
235+
accepted = sock.accept();
236+
237+
while (!Thread.currentThread().isInterrupted())
238+
Thread.sleep(10);
239+
}
240+
finally {
241+
U.closeQuiet(accepted);
242+
}
243+
});
244+
245+
try {
246+
ClientConfiguration cfg = new ClientConfiguration()
247+
.setAddresses("127.0.0.1:" + DFLT_PORT)
248+
.setHandshakeTimeout(500)
249+
.setRequestTimeout(Integer.MAX_VALUE);
250+
251+
GridTestUtils.assertThrowsWithCause(
252+
() -> Ignition.startClient(cfg),
253+
IgniteFutureTimeoutCheckedException.class
254+
);
255+
}
256+
finally {
257+
fut.cancel();
258+
259+
U.closeQuiet(sock);
260+
}
261+
}
262+
263+
/**
264+
* Test that request timeout is independent of connection timeout during operations.
265+
*/
266+
@Test
267+
@SuppressWarnings("ThrowableNotThrown")
268+
public void testRequestTimeoutIndependentOfConnection() throws Exception {
269+
IgniteConfiguration igniteCfg = getConfiguration(getTestIgniteInstanceName());
270+
igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setHandshakeTimeout(Integer.MAX_VALUE));
271+
272+
try (Ignite ignite = startGrid(igniteCfg)) {
273+
ClientConfiguration cfg = getClientConfiguration(ignite)
274+
.setHandshakeTimeout(Integer.MAX_VALUE)
275+
.setRequestTimeout(500);
276+
277+
try (IgniteClient client = Ignition.startClient(cfg)) {
278+
ClientCache<Object, Object> cache = client.getOrCreateCache("testTimeoutCache");
279+
280+
ClientCacheConfiguration txCacheCfg = new ClientCacheConfiguration()
281+
.setName("txCache")
282+
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
283+
284+
ClientCache<Object, Object> txCache = client.getOrCreateCache(txCacheCfg);
285+
286+
CyclicBarrier barrier = new CyclicBarrier(2);
287+
288+
IgniteInternalFuture<?> blockingThread = GridTestUtils.runAsync(() -> {
289+
try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
290+
txCache.put(1, "blocked");
291+
292+
barrier.await(2, TimeUnit.SECONDS);
293+
294+
// Wait for main thread to time out
295+
barrier.await(2, TimeUnit.SECONDS);
296+
}
297+
catch (Exception e) {
298+
throw new IgniteException(e);
299+
}
300+
});
301+
302+
barrier.await(2, TimeUnit.SECONDS);
303+
304+
try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
305+
GridTestUtils.assertThrowsWithCause(
306+
() -> txCache.put(1, "should timeout"),
307+
IgniteFutureTimeoutCheckedException.class
308+
);
309+
}
310+
311+
barrier.await(2, TimeUnit.SECONDS);
312+
313+
cache.put(1, "still works");
314+
assertEquals("still works", cache.get(1));
315+
316+
blockingThread.get();
317+
}
318+
}
319+
}
220320
}

modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ public void testBasic() throws Exception {
9090

9191
HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class);
9292

93-
assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER
94-
+ ", features=[]]");
93+
assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
9594
assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress());
9695
assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort());
9796
assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress());

modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void testClientSessionOutboundQueueLimit() throws Exception {
6868
try (
6969
IgniteClient cli = Ignition.startClient(new ClientConfiguration()
7070
.setAddresses("127.0.0.1:10800")
71-
.setTimeout(5000) // Server will drop packets intended for the client. So client can hang on handshake during reconnect.
71+
.setHandshakeTimeout(5000) // Server will drop packets intended for the client, it can hang on handshake during reconnect.
72+
.setRequestTimeout(5000)
7273
.setRetryLimit(1) // Let's not retry operations if the channel was closed while waiting for a response.
7374
.setEventListeners(new ConnectionEventListener() {
7475
@Override public void onConnectionClosed(ConnectionClosedEvent event) {

0 commit comments

Comments
 (0)