Skip to content

Commit 197a144

Browse files
authored
IGNITE-28498 Use MessageSerializer for Zookeeper discovery custom messages (#13000)
1 parent 6d78625 commit 197a144

15 files changed

Lines changed: 130 additions & 66 deletions

File tree

modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.ignite.internal;
1919

2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.Comparator;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
28+
import java.util.stream.Collectors;
2629
import javax.annotation.processing.AbstractProcessor;
2730
import javax.annotation.processing.RoundEnvironment;
2831
import javax.annotation.processing.SupportedAnnotationTypes;
@@ -72,8 +75,12 @@ public class MessageProcessor extends AbstractProcessor {
7275
/** Externalizable message. */
7376
static final String MARSHALLABLE_MESSAGE_INTERFACE = "org.apache.ignite.internal.MarshallableMessage";
7477

75-
/** This is the only message with zero fields. A serializer must be generated due to restrictions in our communication process. */
76-
static final String HANDSHAKE_WAIT_MESSAGE = "org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
78+
/** Messages with no fields. A serializer must be generated due to restrictions in our communication process. */
79+
static final String[] EMPTY_MESSAGES = {
80+
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage",
81+
"org.apache.ignite.spi.discovery.zk.internal.ZkNoServersMessage",
82+
"org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Null",
83+
};
7784

7885
/** */
7986
private final Map<String, IgniteBiTuple<String, String>> enumMappersInUse = new HashMap<>();
@@ -83,7 +90,11 @@ public class MessageProcessor extends AbstractProcessor {
8390
*/
8491
@Override public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
8592
TypeMirror msgType = processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
86-
TypeMirror handshakeWaitMsgType = processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
93+
List<TypeMirror> emptyMsgs = Arrays.stream(EMPTY_MESSAGES)
94+
.map(cls -> processingEnv.getElementUtils().getTypeElement(cls))
95+
.filter(Objects::nonNull)
96+
.map(Element::asType)
97+
.collect(Collectors.toList());
8798

8899
Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
89100

@@ -101,7 +112,7 @@ public class MessageProcessor extends AbstractProcessor {
101112

102113
List<VariableElement> fields = orderedFields(clazz);
103114

104-
if (!fields.isEmpty() || processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
115+
if (!fields.isEmpty() || emptyMsgs.stream().anyMatch(t -> processingEnv.getTypeUtils().isAssignable(clazz.asType(), t)))
105116
msgFields.put(clazz, fields);
106117
}
107118

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
104104
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
105105
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
106+
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
106107
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
107108
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
108109
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
@@ -214,6 +215,7 @@
214215
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
215216
import org.apache.ignite.spi.IgniteSpi;
216217
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
218+
import org.apache.ignite.spi.discovery.DiscoverySpi;
217219
import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
218220
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
219221
import org.apache.ignite.spi.tracing.TracingConfigurationManager;
@@ -1315,9 +1317,6 @@ else if (e instanceof IgniteCheckedException)
13151317
private void initMessageFactory() throws IgniteCheckedException {
13161318
MessageFactoryProvider[] msgs = ctx.plugins().extensions(MessageFactoryProvider.class);
13171319

1318-
if (msgs == null)
1319-
msgs = new MessageFactoryProvider[0];
1320-
13211320
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
13221321

13231322
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(),
@@ -1330,6 +1329,15 @@ private void initMessageFactory() throws IgniteCheckedException {
13301329
compMsgs.add(f);
13311330
}
13321331

1332+
DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
1333+
1334+
if (discoSpi instanceof IgniteDiscoverySpi) {
1335+
MessageFactoryProvider discoMsgs = ((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();
1336+
1337+
if (discoMsgs != null)
1338+
compMsgs.add(discoMsgs);
1339+
}
1340+
13331341
if (!compMsgs.isEmpty())
13341342
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactoryProvider[compMsgs.size()]));
13351343

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.UUID;
2121
import org.apache.ignite.cluster.ClusterNode;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2223
import org.apache.ignite.spi.discovery.DiscoverySpi;
2324

2425
/**
@@ -57,4 +58,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
5758
* @param err Connection error.
5859
*/
5960
public void resolveCommunicationFailure(ClusterNode node, Exception err);
61+
62+
/** @return Message factory provider. */
63+
public default MessageFactoryProvider messageFactoryProvider() {
64+
return null;
65+
}
6066
}

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.internal.processors.query.h2.twostep.msg;
1919

2020
import org.apache.ignite.internal.GridKernalContext;
21-
import org.apache.ignite.internal.Order;
2221
import org.h2.value.Value;
2322
import org.h2.value.ValueNull;
2423

@@ -29,10 +28,6 @@ public class GridH2Null extends GridH2ValueMessage {
2928
/** */
3029
public static GridH2Null INSTANCE = new GridH2Null();
3130

32-
/** Dummy field to use codegen serializer. */
33-
@Order(0)
34-
byte dummy;
35-
3631
/**
3732
* Disallow new instance creation.
3833
*/
@@ -45,7 +40,6 @@ private GridH2Null() {
4540
return ValueNull.INSTANCE;
4641
}
4742

48-
4943
/** {@inheritDoc} */
5044
@Override public String toString() {
5145
return "NULL";

modules/zookeeper/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
<artifactId>ignite-core</artifactId>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>${project.groupId}</groupId>
45+
<artifactId>ignite-codegen</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
48+
4349
<dependency>
4450
<groupId>org.apache.zookeeper</groupId>
4551
<artifactId>zookeeper</artifactId>

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.ignite.internal.util.typedef.internal.U;
4040
import org.apache.ignite.lang.IgniteBiTuple;
4141
import org.apache.ignite.lang.IgniteProductVersion;
42+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
4243
import org.apache.ignite.resources.LoggerResource;
4344
import org.apache.ignite.spi.IgniteSpiAdapter;
4445
import org.apache.ignite.spi.IgniteSpiConfiguration;
@@ -57,6 +58,7 @@
5758
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
5859
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
5960
import org.apache.ignite.spi.discovery.zk.internal.ZkIgnitePaths;
61+
import org.apache.ignite.spi.discovery.zk.internal.ZkMessageFactory;
6062
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
6163
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
6264
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryStatistics;
@@ -554,6 +556,11 @@ private ZookeeperClusterNode initLocalNode() {
554556
return locNode;
555557
}
556558

559+
/** {@inheritDoc} */
560+
@Override public MessageFactoryProvider messageFactoryProvider() {
561+
return new ZkMessageFactory();
562+
}
563+
557564
/** {@inheritDoc} */
558565
@Override public String toString() {
559566
return S.toString(ZookeeperDiscoverySpi.class, this);

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.ByteArrayInputStream;
2121
import java.io.ByteArrayOutputStream;
22-
import java.io.EOFException;
2322
import java.io.IOException;
2423
import java.io.InputStream;
2524
import java.io.OutputStream;
@@ -28,8 +27,6 @@
2827
import java.util.zip.InflaterInputStream;
2928
import org.apache.ignite.internal.direct.DirectMessageReader;
3029
import org.apache.ignite.internal.direct.DirectMessageWriter;
31-
import org.apache.ignite.internal.util.typedef.internal.U;
32-
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
3330
import org.apache.ignite.plugin.extensions.communication.Message;
3431
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3532
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
@@ -42,25 +39,14 @@
4239
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
4340
*/
4441
public class DiscoveryMessageParser {
45-
/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
46-
// TODO: remove these flags after refactoring all discovery messages.
47-
private static final byte JAVA_SERIALIZATION = (byte)1;
48-
49-
/** Leading byte for messages use {@link MessageSerializer} for serialization. */
50-
private static final byte MESSAGE_SERIALIZATION = (byte)2;
51-
5242
/** Size for an intermediate buffer for serializing discovery messages. */
5343
private static final int MSG_BUFFER_SIZE = 100;
5444

55-
/** */
56-
private final JdkMarshaller jdkMarshaller;
57-
5845
/** */
5946
private final MessageFactory msgFactory;
6047

6148
/** */
62-
public DiscoveryMessageParser(JdkMarshaller jdkMarshaller, MessageFactory msgFactory) {
63-
this.jdkMarshaller = jdkMarshaller;
49+
public DiscoveryMessageParser(MessageFactory msgFactory) {
6450
this.msgFactory = msgFactory;
6551
}
6652

@@ -69,16 +55,7 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
6955
ByteArrayOutputStream baos = new ByteArrayOutputStream();
7056

7157
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
72-
if (msg instanceof Message) {
73-
out.write(MESSAGE_SERIALIZATION);
74-
75-
serializeMessage((Message)msg, out);
76-
}
77-
else {
78-
out.write(JAVA_SERIALIZATION);
79-
80-
U.marshal(jdkMarshaller, msg, out);
81-
}
58+
serializeMessage((Message)msg, out);
8259
}
8360
catch (Exception e) {
8461
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
@@ -93,14 +70,6 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
9370
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
9471
InflaterInputStream in = new InflaterInputStream(bais)
9572
) {
96-
byte mode = (byte)in.read();
97-
98-
if (mode == JAVA_SERIALIZATION)
99-
return U.unmarshal(jdkMarshaller, in, U.gridClassLoader());
100-
101-
if (MESSAGE_SERIALIZATION != mode)
102-
throw new IOException("Received unexpected byte while reading discovery message: " + mode);
103-
10473
return (DiscoverySpiCustomMessage)deserializeMessage(in);
10574
}
10675
catch (Exception e) {
@@ -144,14 +113,15 @@ private Message deserializeMessage(InputStream in) throws IOException {
144113
do {
145114
int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining());
146115

147-
if (read == -1)
148-
throw new EOFException("Stream closed before message was fully read.");
149-
150-
msgBuf.limit(msgBuf.position() + read);
151-
msgBuf.rewind();
116+
if (read > 0) {
117+
msgBuf.limit(msgBuf.position() + read);
118+
msgBuf.rewind();
119+
}
152120

153121
finished = msgSer.readFrom(msg, msgReader);
154122

123+
assert read != -1 || finished : "Stream closed before message was fully read.";
124+
155125
if (!finished)
156126
msgBuf.compact();
157127
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,35 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2225
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2326
import org.jetbrains.annotations.Nullable;
2427

2528
/**
2629
*
2730
*/
28-
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
31+
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2932
/** */
3033
private static final long serialVersionUID = 0L;
3134

3235
/** */
33-
final UUID futId;
36+
@Order(0)
37+
UUID futId;
3438

3539
/** */
36-
final long topVer;
40+
@Order(1)
41+
long topVer;
3742

3843
/** */
39-
transient ZkCommunicationErrorResolveResult res;
44+
ZkCommunicationErrorResolveResult res;
45+
46+
/** Constructor for {@link MessageFactory}. */
47+
public ZkCommunicationErrorResolveFinishMessage() {
48+
// No-op.
49+
}
4050

4151
/**
4252
* @param futId Future ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2225
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2326
import org.jetbrains.annotations.Nullable;
2427

2528
/**
2629
* Zk Communication Error Resolve Start Message.
2730
*/
28-
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
31+
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2932
/** */
3033
private static final long serialVersionUID = 0L;
3134

3235
/** */
33-
final UUID id;
36+
@Order(0)
37+
UUID id;
38+
39+
/** Constructor for {@link MessageFactory}. */
40+
public ZkCommunicationErrorResolveStartMessage() {
41+
// No-op.
42+
}
3443

3544
/**
3645
* @param id Unique ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,32 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import org.apache.ignite.internal.Order;
2021
import org.apache.ignite.internal.util.typedef.internal.S;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
23+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2124
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2225
import org.jetbrains.annotations.Nullable;
2326

2427
/**
2528
* Zk Force Node Fail Message.
2629
*/
27-
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
30+
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2831
/** */
2932
private static final long serialVersionUID = 0L;
3033

3134
/** */
32-
final long nodeInternalId;
35+
@Order(0)
36+
long nodeInternalId;
3337

3438
/** */
35-
final String warning;
39+
@Order(1)
40+
String warning;
41+
42+
/** Constructor for {@link MessageFactory}. */
43+
public ZkForceNodeFailMessage() {
44+
// No-op.
45+
}
3646

3747
/**
3848
* @param nodeInternalId Node ID.

0 commit comments

Comments
 (0)