Skip to content

Commit ae7a1e6

Browse files
sergey-chugunov-1985zstan
authored andcommitted
IGNITE-27722 Split TcpDiscoveryIoSession to fix race in RingMessageWorker (#12731)
(cherry picked from commit a182326)
1 parent e7b9327 commit ae7a1e6

5 files changed

Lines changed: 277 additions & 39 deletions

File tree

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.ignite.internal.util.typedef.C1;
101101
import org.apache.ignite.internal.util.typedef.F;
102102
import org.apache.ignite.internal.util.typedef.P1;
103+
import org.apache.ignite.internal.util.typedef.T2;
103104
import org.apache.ignite.internal.util.typedef.X;
104105
import org.apache.ignite.internal.util.typedef.internal.LT;
105106
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2834,6 +2835,16 @@ private class RingMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessag
28342835
/** Socket. */
28352836
private Socket sock;
28362837

2838+
// This serializer is used exclusively for serializing messages sent to clients,
2839+
// as it represents a special case within the RingMessageWorker workflow.
2840+
// Generally, both serialization and deserialization of messages should be handled by TcpDiscoveryIoSession.
2841+
// However, there are scenarios where the session is not available, yet messages still need to be sent to clients.
2842+
// A typical example is a single server with one or more connected clients.
2843+
// To address this, we use TcpDiscoveryMessageSerializer, which includes some code copied from TcpDiscoveryIoSession
2844+
// and can be instantiated independently of any active session.
2845+
/** */
2846+
private final TcpDiscoveryMessageSerializer clientMsgSer = new TcpDiscoveryMessageSerializer(spi);
2847+
28372848
/** IO session. */
28382849
private TcpDiscoveryIoSession ses;
28392850

@@ -3241,19 +3252,36 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
32413252
if (spi.ensured(msg))
32423253
msgHist.add(msg);
32433254

3255+
if (clientMsgWorkers.isEmpty())
3256+
return;
3257+
3258+
byte[] msgBytes = null;
3259+
3260+
if (!(msg instanceof TcpDiscoveryNodeAddedMessage)) {
3261+
try {
3262+
msgBytes = clientMsgSer.serializeMessage(msg);
3263+
}
3264+
catch (IgniteCheckedException | IOException e) {
3265+
U.error(log, "Failed to serialize message: " + msg, e);
3266+
3267+
return;
3268+
}
3269+
}
3270+
32443271
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
3272+
TcpDiscoveryAbstractMessage msg0 = msg;
3273+
32453274
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
32463275
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
32473276

32483277
if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
3249-
msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
3278+
msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
32503279

3251-
prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null);
3280+
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null);
32523281
}
32533282
}
32543283

3255-
// TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722
3256-
clientMsgWorker.addMessage(msg);
3284+
clientMsgWorker.addMessage(msg0, msgBytes);
32573285
}
32583286
}
32593287
}
@@ -7584,12 +7612,20 @@ private class StatisticsPrinter extends IgniteSpiThread {
75847612
}
75857613

75867614
/** */
7587-
private class ClientMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessage> {
7615+
private class ClientMessageWorker extends MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
75887616
/** Node ID. */
75897617
private final UUID clientNodeId;
75907618

7619+
// The code responsible for sending and receiving messages to and from client nodes represents a special case in ServerImpl,
7620+
// as it is split into two separate components.
7621+
// One part, ClientMessageWorker, handles only message sending to clients and does not process responses.
7622+
// The other part, which reads messages from clients, is implemented in SocketReader.
7623+
// Due to this separation, we don't require a full TcpDiscoveryIoSession here
7624+
// and can instead extract just the message-writing functionality.
7625+
// At the same time, we aim to keep both reading and writing logic encapsulated within TcpDiscoveryIoSession.
7626+
// As a result, we need to copy some code from TcpDiscoveryIoSession into the new class, TcpDiscoveryMessageSerializer.
75917627
/** */
7592-
private final TcpDiscoveryIoSession ses;
7628+
private final TcpDiscoveryMessageSerializer clientMsgSer;
75937629

75947630
/** Socket. */
75957631
private final Socket sock;
@@ -7619,7 +7655,7 @@ private ClientMessageWorker(Socket sock, UUID clientNodeId, IgniteLogger log) {
76197655
this.sock = sock;
76207656
this.clientNodeId = clientNodeId;
76217657

7622-
ses = createSession(sock);
7658+
clientMsgSer = new TcpDiscoveryMessageSerializer(spi);
76237659

76247660
lastMetricsUpdateMsgTimeNanos = System.nanoTime();
76257661
}
@@ -7651,10 +7687,20 @@ void metrics(ClusterMetrics metrics) {
76517687
* @param msg Message.
76527688
*/
76537689
void addMessage(TcpDiscoveryAbstractMessage msg) {
7690+
addMessage(msg, null);
7691+
}
7692+
7693+
/**
7694+
* @param msg Message.
7695+
* @param msgBytes Optional message bytes.
7696+
*/
7697+
void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
7698+
T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg, msgBytes);
7699+
76547700
if (msg.highPriority())
7655-
queue.addFirst(msg);
7701+
queue.addFirst(t);
76567702
else
7657-
queue.add(msg);
7703+
queue.add(t);
76587704

76597705
DebugLogger log = messageLogger(msg);
76607706

@@ -7663,14 +7709,14 @@ void addMessage(TcpDiscoveryAbstractMessage msg) {
76637709
}
76647710

76657711
/** {@inheritDoc} */
7666-
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
7712+
@Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
76677713
boolean success = false;
76687714

7715+
TcpDiscoveryAbstractMessage msg = msgT.get1();
7716+
76697717
try {
76707718
assert msg.verified() : msg;
76717719

7672-
byte[] msgBytes = ses.serializeMessage(msg);
7673-
76747720
DebugLogger msgLog = messageLogger(msg);
76757721

76767722
if (msg instanceof TcpDiscoveryClientAckResponse) {
@@ -7692,8 +7738,8 @@ else if (msgLog.isDebugEnabled()) {
76927738
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
76937739
}
76947740

7695-
spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
7696-
spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
7741+
writeToSocket(msgT, spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() :
7742+
spi.getSocketTimeout());
76977743
}
76987744
}
76997745
else {
@@ -7704,7 +7750,7 @@ else if (msgLog.isDebugEnabled()) {
77047750

77057751
assert topologyInitialized(msg) : msg;
77067752

7707-
spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
7753+
writeToSocket(msgT, spi.getEffectiveSocketTimeout(false));
77087754
}
77097755

77107756
boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
@@ -7733,6 +7779,17 @@ else if (msgLog.isDebugEnabled()) {
77337779
}
77347780
}
77357781

7782+
/**
7783+
* @param msgT Message tuple.
7784+
* @param timeout Timeout.
7785+
*/
7786+
private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> msgT, long timeout)
7787+
throws IgniteCheckedException, IOException {
7788+
byte[] msgBytes = msgT.get2() == null ? clientMsgSer.serializeMessage(msgT.get1()) : msgT.get2();
7789+
7790+
spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
7791+
}
7792+
77367793
/**
77377794
* @param msg Message.
77387795
* @return {@code True} if topology initialized.

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.BufferedInputStream;
2121
import java.io.BufferedOutputStream;
2222
import java.io.ByteArrayInputStream;
23-
import java.io.ByteArrayOutputStream;
2423
import java.io.EOFException;
2524
import java.io.IOException;
2625
import java.io.InputStream;
@@ -71,7 +70,7 @@ public class TcpDiscoveryIoSession {
7170
static final byte MESSAGE_SERIALIZATION = (byte)2;
7271

7372
/** */
74-
private final TcpDiscoverySpi spi;
73+
final TcpDiscoverySpi spi;
7574

7675
/** Loads discovery messages classes during java deserialization. */
7776
private final ClassLoader clsLdr;
@@ -80,7 +79,7 @@ public class TcpDiscoveryIoSession {
8079
private final Socket sock;
8180

8281
/** */
83-
private final DirectMessageWriter msgWriter;
82+
final DirectMessageWriter msgWriter;
8483

8584
/** */
8685
private final DirectMessageReader msgReader;
@@ -92,7 +91,7 @@ public class TcpDiscoveryIoSession {
9291
private final CompositeInputStream in;
9392

9493
/** Intermediate buffer for serializing discovery messages. */
95-
private final ByteBuffer msgBuf;
94+
final ByteBuffer msgBuf;
9695

9796
/**
9897
* Creates a new discovery I/O session bound to the given socket.
@@ -238,25 +237,6 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
238237
}
239238
}
240239

241-
/**
242-
* Serializes a discovery message into a byte array.
243-
*
244-
* @param msg Discovery message to serialize.
245-
* @return Serialized byte array containing the message data.
246-
* @throws IgniteCheckedException If serialization fails.
247-
* @throws IOException If serialization fails.
248-
*/
249-
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
250-
if (!(msg instanceof Message))
251-
return U.marshal(spi.marshaller(), msg);
252-
253-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
254-
serializeMessage((Message)msg, out);
255-
256-
return out.toByteArray();
257-
}
258-
}
259-
260240
/** @return Socket. */
261241
public Socket socket() {
262242
return sock;
@@ -269,7 +249,7 @@ public Socket socket() {
269249
* @param out Output stream to write serialized message.
270250
* @throws IOException If serialization fails.
271251
*/
272-
private void serializeMessage(Message m, OutputStream out) throws IOException {
252+
void serializeMessage(Message m, OutputStream out) throws IOException {
273253
MessageSerializer msgSer = spi.messageFactory().serializer(m.directType());
274254

275255
msgWriter.reset();
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.ignite.spi.discovery.tcp;
18+
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.net.Socket;
24+
import org.apache.ignite.IgniteCheckedException;
25+
import org.apache.ignite.internal.util.typedef.internal.U;
26+
import org.apache.ignite.plugin.extensions.communication.Message;
27+
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
28+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
29+
30+
/**
31+
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
32+
* <p>
33+
* It is used in a special case: when server wants to send discovery messages to clients, it may not have a {@link TcpDiscoveryIoSession}
34+
* to serialize the messages.
35+
* This class enables server to serialize discovery messages anyway, duplicating serialization code from {@link TcpDiscoveryIoSession}.
36+
*/
37+
class TcpDiscoveryMessageSerializer extends TcpDiscoveryIoSession {
38+
/**
39+
* @param spi Discovery SPI instance.
40+
*/
41+
public TcpDiscoveryMessageSerializer(TcpDiscoverySpi spi) {
42+
super(new Socket() {
43+
@Override public OutputStream getOutputStream() throws IOException {
44+
return null;
45+
}
46+
47+
@Override public InputStream getInputStream() throws IOException {
48+
return null;
49+
}
50+
}, spi);
51+
}
52+
53+
/**
54+
* Serializes a discovery message into a byte array.
55+
*
56+
* @param msg Discovery message to serialize.
57+
* @return Serialized byte array containing the message data.
58+
* @throws IgniteCheckedException If serialization fails.
59+
* @throws IOException If serialization fails.
60+
*/
61+
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException {
62+
if (!(msg instanceof Message))
63+
return U.marshal(spi.marshaller(), msg);
64+
65+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
66+
serializeMessage((Message)msg, out);
67+
68+
return out.toByteArray();
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)