Skip to content

Commit a4bd886

Browse files
petrov-mgNSAmelchev
authored andcommitted
IGNITE-24224 Fixed NIO server broken message serialization with SSL enabled (#11819)
(cherry picked from commit 70a8f43)
1 parent 8014ec0 commit a4bd886

10 files changed

Lines changed: 565 additions & 211 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3321,7 +3321,7 @@ private static final class WriteRequestSystemImpl implements SessionWriteRequest
33213321
/**
33223322
*
33233323
*/
3324-
private static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
3324+
static final class WriteRequestImpl implements SessionWriteRequest, SessionChangeRequest {
33253325
/** */
33263326
private GridNioSession ses;
33273327

modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,15 @@ public void enabledProtocols(String... enabledProtos) {
230230
}
231231

232232
try {
233-
GridNioSslHandler hnd = new GridNioSslHandler(this,
233+
GridNioSslHandler hnd = new GridNioSslHandler(
234+
this,
234235
ses,
235236
engine,
236237
directBuf,
237238
order,
238239
log,
239240
handshake,
240-
sslMeta.encodedBuffer());
241+
sslMeta);
241242

242243
sslMeta.handler(hnd);
243244

@@ -257,10 +258,7 @@ public void enabledProtocols(String... enabledProtos) {
257258

258259
hnd.handshake();
259260

260-
ByteBuffer alreadyDecoded = sslMeta.decodedBuffer();
261-
262-
if (alreadyDecoded != null)
263-
proceedMessageReceived(ses, alreadyDecoded);
261+
processApplicationBuffer(ses, hnd.getApplicationBuffer());
264262
}
265263
catch (SSLException e) {
266264
onSessionOpenedException = e;
@@ -400,14 +398,7 @@ public ByteBuffer encrypt(GridNioSession ses, ByteBuffer input) throws SSLExcept
400398
if (hnd.isHandshakeFinished())
401399
hnd.flushDeferredWrites();
402400

403-
ByteBuffer appBuf = hnd.getApplicationBuffer();
404-
405-
appBuf.flip();
406-
407-
if (appBuf.hasRemaining())
408-
proceedMessageReceived(ses, appBuf);
409-
410-
appBuf.compact();
401+
processApplicationBuffer(ses, hnd.getApplicationBuffer());
411402

412403
if (hnd.isInboundDone() && !hnd.isOutboundDone()) {
413404
if (log.isDebugEnabled())
@@ -424,6 +415,16 @@ public ByteBuffer encrypt(GridNioSession ses, ByteBuffer input) throws SSLExcept
424415
}
425416
}
426417

418+
/** */
419+
private void processApplicationBuffer(GridNioSession ses, ByteBuffer appBuffer) throws IgniteCheckedException {
420+
appBuffer.flip();
421+
422+
if (appBuffer.hasRemaining())
423+
proceedMessageReceived(ses, appBuffer);
424+
425+
appBuffer.compact();
426+
}
427+
427428
/** {@inheritDoc} */
428429
@Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws IgniteCheckedException {
429430
GridNioSslHandler hnd = sslHandler(ses);

modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.ignite.internal.util.nio.GridNioSession;
3737
import org.apache.ignite.internal.util.typedef.internal.U;
3838
import org.apache.ignite.lang.IgniteInClosure;
39+
import org.jetbrains.annotations.Nullable;
3940

4041
import static javax.net.ssl.SSLEngineResult.HandshakeStatus;
4142
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
@@ -109,17 +110,19 @@ class GridNioSslHandler extends ReentrantLock {
109110
* @param directBuf Direct buffer flag.
110111
* @param order Byte order.
111112
* @param handshake is handshake required.
112-
* @param encBuf encoded buffer to be used.
113+
* @param sslMeta SSL meta.
113114
* @throws SSLException If exception occurred when starting SSL handshake.
114115
*/
115-
GridNioSslHandler(GridNioSslFilter parent,
116+
GridNioSslHandler(
117+
GridNioSslFilter parent,
116118
GridNioSession ses,
117119
SSLEngine engine,
118120
boolean directBuf,
119121
ByteOrder order,
120122
IgniteLogger log,
121123
boolean handshake,
122-
ByteBuffer encBuf) throws SSLException {
124+
GridSslMeta sslMeta
125+
) throws SSLException {
123126
assert parent != null;
124127
assert ses != null;
125128
assert engine != null;
@@ -145,32 +148,21 @@ class GridNioSslHandler extends ReentrantLock {
145148
// Allocate a little bit more so SSL engine would not return buffer overflow status.
146149
int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
147150

148-
outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
149-
150-
outNetBuf.order(order);
151-
152-
inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
153-
154-
inNetBuf.order(order);
155-
156-
if (encBuf != null) {
157-
encBuf.flip();
158-
159-
inNetBuf.put(encBuf); // Buffer contains bytes read but not handled by sslEngine at BlockingSslHandler.
160-
}
151+
outNetBuf = createBuffer(netBufSize, null);
161152

162153
// Initially buffer is empty.
163154
outNetBuf.position(0);
164155
outNetBuf.limit(0);
165156

166-
int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);
167-
168-
appBuf = directBuf ? ByteBuffer.allocateDirect(appBufSize) : ByteBuffer.allocate(appBufSize);
157+
inNetBuf = createBuffer(netBufSize, sslMeta.encodedBuffer());
169158

170-
appBuf.order(order);
159+
appBuf = createBuffer(
160+
Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2),
161+
sslMeta.decodedBuffer()
162+
);
171163

172164
if (log.isDebugEnabled())
173-
log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBufSize + ']');
165+
log.debug("Started SSL session [netBufSize=" + outNetBuf.capacity() + ", appBufSize=" + appBuf.capacity() + ']');
174166
}
175167

176168
/**
@@ -682,6 +674,21 @@ private ByteBuffer copy(ByteBuffer original) {
682674
return cp;
683675
}
684676

677+
/** */
678+
private ByteBuffer createBuffer(int size, @Nullable ByteBuffer init) {
679+
if (init != null)
680+
size = Math.max(size, init.remaining());
681+
682+
ByteBuffer buf = directBuf ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
683+
684+
buf.order(order);
685+
686+
if (init != null)
687+
buf.put(init);
688+
689+
return buf;
690+
}
691+
685692
/**
686693
* Write request for cases while handshake is not finished yet.
687694
*/

modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public long tcpHandshake(
8181
BlockingTransport transport = stateProvider.isSslEnabled() ?
8282
new SslTransport(sslMeta, ch, directBuffer, log) : new TcpTransport(ch);
8383

84-
ByteBuffer buf = transport.recieveNodeId();
84+
ByteBuffer buf = transport.receiveNodeId();
8585

8686
if (buf == null)
8787
return NEED_WAIT;
@@ -98,7 +98,7 @@ else if (log.isDebugEnabled())
9898

9999
transport.sendHandshake(msg);
100100

101-
buf = transport.recieveAcknowledge();
101+
buf = transport.receiveAcknowledge();
102102

103103
long rcvCnt = buf.getLong(DIRECT_TYPE_SIZE);
104104

@@ -125,7 +125,7 @@ private abstract static class BlockingTransport {
125125
* @return Buffer with {@link NodeIdMessage}.
126126
* @throws IgniteCheckedException If failed.
127127
*/
128-
ByteBuffer recieveNodeId() throws IgniteCheckedException {
128+
ByteBuffer receiveNodeId() throws IgniteCheckedException {
129129
ByteBuffer buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE)
130130
.order(ByteOrder.LITTLE_ENDIAN);
131131

@@ -171,7 +171,7 @@ void sendHandshake(HandshakeMessage msg) throws IgniteCheckedException {
171171
* @return Buffer with message.
172172
* @throws IgniteCheckedException If failed.
173173
*/
174-
ByteBuffer recieveAcknowledge() throws IgniteCheckedException {
174+
ByteBuffer receiveAcknowledge() throws IgniteCheckedException {
175175
ByteBuffer buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE)
176176
.order(ByteOrder.LITTLE_ENDIAN);
177177

@@ -333,8 +333,11 @@ private static class SslTransport extends BlockingTransport {
333333

334334
ByteBuffer inBuf = handler.inputBuffer();
335335

336-
if (inBuf.position() > 0)
336+
if (inBuf.position() > 0) {
337+
inBuf.flip();
338+
337339
sslMeta.encodedBuffer(inBuf);
340+
}
338341
}
339342

340343
/**
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util.nio;
19+
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.Map;
23+
import java.util.UUID;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.Supplier;
28+
import org.apache.ignite.cluster.ClusterNode;
29+
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
30+
import org.apache.ignite.internal.util.typedef.internal.U;
31+
import org.apache.ignite.lang.IgniteRunnable;
32+
import org.apache.ignite.plugin.extensions.communication.Message;
33+
import org.apache.ignite.spi.communication.CommunicationListener;
34+
import org.apache.ignite.spi.communication.CommunicationSpi;
35+
import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
36+
import org.apache.ignite.spi.communication.TestVolatilePayloadMessage;
37+
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
38+
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
39+
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
40+
import org.apache.ignite.testframework.GridTestUtils;
41+
import org.junit.Test;
42+
43+
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
44+
45+
/**
46+
* Tests the case when regular communications messages are sent along with the last handshake messages and SSL is enabled.
47+
* It asserts that if not all received by network bytes are processed by {@link BlockingSslHandler} during the handshake
48+
* phase, then all remaining bytes are properly copied to {@code GridNioSslHandler}, which replaces
49+
* {@link BlockingSslHandler} after the handshake phase.
50+
* The steps that can lead to mentioned above conditions:
51+
* <p>
52+
* 1. Node B sends a MESSAGE to Node A and stores it in the local recovery descriptor until an acknowledgment is received
53+
* from Node A.
54+
* <p>
55+
* 2. Node A, for whatever reason, reestablishes connection with node B and starts handshake negotiation.
56+
* <p>
57+
* 3. Node B during the final phase of handshake sends {@link RecoveryLastReceivedMessage} and resends not acknowledged
58+
* MESSAGE from step 1. But all sent bytes are divided into two network packets. Let's assume that the first packet
59+
* contains all bytes related to {@link RecoveryLastReceivedMessage} and only half of the MESSAGE bytes.
60+
* <p>
61+
* 4. Node A decodes {@link RecoveryLastReceivedMessage} from the received network packet and finishes the handshake.
62+
* But the MESSAGE cannot be processed because not enough bytes were received. So we must save remaining bytes from the
63+
* first network packet, wait for the next network packet and finish MESSAGE deserialization.
64+
*
65+
*/
66+
public class TcpCommunicationSpiSslVolatilePayloadTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> {
67+
/** */
68+
private static final int TEST_ITERATION_CNT = 1000;
69+
70+
/** The number of messages intended to fill the network buffer during last handshake message sending. */
71+
private static final int RECOVERY_DESCRIPTOR_QUEUE_MESSAGE_CNT = 50;
72+
73+
/** */
74+
private static final AtomicInteger msgCreatedCntr = new AtomicInteger();
75+
76+
/** */
77+
private static final AtomicInteger msgReceivedCntr = new AtomicInteger();
78+
79+
/** */
80+
private static final Map<Integer, TestVolatilePayloadMessage> messages = new ConcurrentHashMap<>();
81+
82+
/** {@inheritDoc} */
83+
@Override protected CommunicationSpi<Message> getSpi(int idx) {
84+
return new TcpCommunicationSpi().setLocalPort(GridTestUtils.getNextCommPort(getClass()))
85+
.setIdleConnectionTimeout(2000)
86+
.setTcpNoDelay(true);
87+
}
88+
89+
/** {@inheritDoc} */
90+
@Override protected CommunicationListener<Message> createMessageListener(UUID nodeId) {
91+
return new TestCommunicationListener();
92+
}
93+
94+
/** {@inheritDoc} */
95+
@Override protected Map<Short, Supplier<Message>> customMessageTypes() {
96+
return Collections.singletonMap(TestVolatilePayloadMessage.DIRECT_TYPE, TestVolatilePayloadMessage::new);
97+
}
98+
99+
/** {@inheritDoc} */
100+
@Override protected boolean isSslEnabled() {
101+
return true;
102+
}
103+
104+
/** */
105+
@Test
106+
public void test() throws Exception {
107+
ClusterNode from = nodes.get(0);
108+
ClusterNode to = nodes.get(1);
109+
110+
for (int i = 0; i < TEST_ITERATION_CNT; i++) {
111+
// Force connection to be established.
112+
sendMessage(from, to, createMessage());
113+
114+
GridNioRecoveryDescriptor fromDesc = extractRecoveryDescriptor(from);
115+
GridNioRecoveryDescriptor toDesc = extractRecoveryDescriptor(to);
116+
117+
// Stores multiple dummy messages in a recovery descriptor. When the connection is restored, they will be
118+
// written to the network buffer along with the last handshake message.
119+
// See TcpHandshakeExecutor#receiveAcknowledge
120+
for (int j = 0; j < RECOVERY_DESCRIPTOR_QUEUE_MESSAGE_CNT; j++)
121+
toDesc.add(new GridNioServer.WriteRequestImpl(toDesc.session(), createMessage(), false, null));
122+
123+
// Close connection to re-initiate handshake between nodes.
124+
if (fromDesc.session() != null)
125+
fromDesc.session().close();
126+
}
127+
128+
assertTrue(waitForCondition(() -> msgCreatedCntr.get() == msgReceivedCntr.get(), 5000));
129+
}
130+
131+
/** */
132+
public GridNioRecoveryDescriptor extractRecoveryDescriptor(ClusterNode node) throws Exception {
133+
CommunicationSpi<Message> spi = spis.get(node.id());
134+
135+
GridNioServerWrapper wrapper = U.field(spi, "nioSrvWrapper");
136+
137+
assertTrue(waitForCondition(() -> !wrapper.recoveryDescs().values().isEmpty(), getTestTimeout()));
138+
139+
return wrapper.recoveryDescs().values().stream().findFirst().get();
140+
}
141+
142+
/** */
143+
private Message createMessage() {
144+
byte[] payload = new byte[ThreadLocalRandom.current().nextInt(10, 1024)];
145+
146+
ThreadLocalRandom.current().nextBytes(payload);
147+
148+
TestVolatilePayloadMessage msg = new TestVolatilePayloadMessage(msgCreatedCntr.getAndIncrement(), payload);
149+
150+
messages.put(msg.index(), msg);
151+
152+
return msg;
153+
}
154+
155+
/** */
156+
private void sendMessage(ClusterNode from, ClusterNode to, Message msg) {
157+
spis.get(from.id()).sendMessage(to, msg);
158+
}
159+
160+
/** */
161+
private static class TestCommunicationListener implements CommunicationListener<Message> {
162+
/** {@inheritDoc} */
163+
@Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) {
164+
msgC.run();
165+
166+
if (msg instanceof TestVolatilePayloadMessage) {
167+
TestVolatilePayloadMessage testMsg = (TestVolatilePayloadMessage)msg;
168+
169+
TestVolatilePayloadMessage expMsg = messages.get(testMsg.index());
170+
171+
assertNotNull(expMsg);
172+
173+
assertTrue(Arrays.equals(expMsg.payload(), testMsg.payload()));
174+
175+
msgReceivedCntr.incrementAndGet();
176+
}
177+
}
178+
179+
/** {@inheritDoc} */
180+
@Override public void onDisconnected(UUID nodeId) {
181+
// No-op.
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)