Skip to content

Commit 094f54c

Browse files
shishkoviljazstan
authored andcommitted
IGNITE-27652 Refactor RingMessageWorker#sendMessageToClients (#12663)
(cherry picked from commit 1f175f9)
1 parent ad8ccf7 commit 094f54c

1 file changed

Lines changed: 10 additions & 53 deletions

File tree

  • modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import org.apache.ignite.internal.util.typedef.C1;
101101
import org.apache.ignite.internal.util.typedef.F;
102102
import org.apache.ignite.internal.util.typedef.P1;
103-
import org.apache.ignite.internal.util.typedef.T2;
104103
import org.apache.ignite.internal.util.typedef.X;
105104
import org.apache.ignite.internal.util.typedef.internal.LT;
106105
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -3242,46 +3241,19 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
32423241
if (spi.ensured(msg))
32433242
msgHist.add(msg);
32443243

3245-
byte[] msgBytes = null;
3246-
32473244
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
3248-
if (msgBytes == null) {
3249-
try {
3250-
msgBytes = clientMsgWorker.ses.serializeMessage(msg);
3251-
}
3252-
catch (IgniteCheckedException | IOException e) {
3253-
U.error(log, "Failed to serialize message to a client: " + msg + ", recepient " +
3254-
"client id: " + clientMsgWorker.clientNodeId, e);
3255-
3256-
break;
3257-
}
3258-
}
3259-
3260-
TcpDiscoveryAbstractMessage msg0 = msg;
3261-
byte[] msgBytes0 = msgBytes;
3262-
32633245
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
32643246
TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
32653247

3266-
TcpDiscoveryNode node = nodeAddedMsg.node();
3267-
3268-
if (clientMsgWorker.clientNodeId.equals(node.id())) {
3269-
try {
3270-
// TODO: https://issues.apache.org/jira/browse/IGNITE-27556 refactor serialization.
3271-
msg0 = U.unmarshal(spi.marshaller(), msgBytes,
3272-
U.resolveClassLoader(spi.ignite().configuration()));
3273-
3274-
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null);
3248+
if (clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
3249+
msg = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
32753250

3276-
msgBytes0 = null;
3277-
}
3278-
catch (IgniteCheckedException e) {
3279-
U.error(log, "Failed to create message copy: " + msg, e);
3280-
}
3251+
prepareNodeAddedMessage(msg, clientMsgWorker.clientNodeId, null);
32813252
}
32823253
}
32833254

3284-
clientMsgWorker.addMessage(msg0, msgBytes0);
3255+
// TODO Investigate possible optimizations: https://issues.apache.org/jira/browse/IGNITE-27722
3256+
clientMsgWorker.addMessage(msg);
32853257
}
32863258
}
32873259
}
@@ -7610,7 +7582,7 @@ private class StatisticsPrinter extends IgniteSpiThread {
76107582
}
76117583

76127584
/** */
7613-
private class ClientMessageWorker extends MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
7585+
private class ClientMessageWorker extends MessageWorker<TcpDiscoveryAbstractMessage> {
76147586
/** Node ID. */
76157587
private final UUID clientNodeId;
76167588

@@ -7677,20 +7649,10 @@ void metrics(ClusterMetrics metrics) {
76777649
* @param msg Message.
76787650
*/
76797651
void addMessage(TcpDiscoveryAbstractMessage msg) {
7680-
addMessage(msg, null);
7681-
}
7682-
7683-
/**
7684-
* @param msg Message.
7685-
* @param msgBytes Optional message bytes.
7686-
*/
7687-
void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
7688-
T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg, msgBytes);
7689-
76907652
if (msg.highPriority())
7691-
queue.addFirst(t);
7653+
queue.addFirst(msg);
76927654
else
7693-
queue.add(t);
7655+
queue.add(msg);
76947656

76957657
DebugLogger log = messageLogger(msg);
76967658

@@ -7699,18 +7661,13 @@ void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
76997661
}
77007662

77017663
/** {@inheritDoc} */
7702-
@Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
7664+
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
77037665
boolean success = false;
77047666

7705-
TcpDiscoveryAbstractMessage msg = msgT.get1();
7706-
77077667
try {
77087668
assert msg.verified() : msg;
77097669

7710-
byte[] msgBytes = msgT.get2();
7711-
7712-
if (msgBytes == null)
7713-
msgBytes = ses.serializeMessage(msg);
7670+
byte[] msgBytes = ses.serializeMessage(msg);
77147671

77157672
DebugLogger msgLog = messageLogger(msg);
77167673

0 commit comments

Comments
 (0)