Skip to content

Commit c9390c5

Browse files
committed
reimpl
1 parent e51c2c1 commit c9390c5

19 files changed

Lines changed: 57 additions & 41 deletions

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.lang.reflect.Constructor;
2121
import org.apache.ignite.IgniteException;
22+
import org.apache.ignite.internal.binary.BinaryMarshaller;
2223
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
2324
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
2425
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
@@ -227,8 +228,10 @@
227228
import org.apache.ignite.internal.util.distributed.FullMessage;
228229
import org.apache.ignite.internal.util.distributed.InitMessage;
229230
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
231+
import org.apache.ignite.internal.util.typedef.internal.U;
230232
import org.apache.ignite.lang.IgniteProductVersion;
231233
import org.apache.ignite.marshaller.Marshaller;
234+
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
232235
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
233236
import org.apache.ignite.plugin.extensions.communication.Message;
234237
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -285,11 +288,14 @@ public class CoreMessagesProvider implements MessageFactoryProvider {
285288
/** Handshake wait message type. */
286289
public static final short HANDSHAKE_WAIT_MSG_TYPE = HANDSHAKE_MSG_TYPE + 1;
287290

288-
/** Custom data marshaller. */
289-
private final Marshaller marsh;
291+
/** Binary marshaller. */
292+
private final Marshaller schemaAwareMarhaller;
290293

291-
/** Class loader for the custom data marshalling. */
292-
private final ClassLoader clsLdr;
294+
/** Binary marshaller. */
295+
private final Marshaller schemaLessMarshaller;
296+
297+
/** Resolved classloader. */
298+
private final ClassLoader resolvedClsLdr;
293299

294300
/** */
295301
private short msgIdx;
@@ -298,12 +304,14 @@ public class CoreMessagesProvider implements MessageFactoryProvider {
298304
private @Nullable MessageFactory factory;
299305

300306
/**
301-
* @param marsh Custom data marshaller.
302-
* @param clsLdr Class loader for the custom data marshalling.
307+
* @param schemaAwareMarhaller Schema-aware marshaller like {@link BinaryMarshaller}.
308+
* @param schemaLessMarshaller Pure, schemaless marshaller like {@link JdkMarshaller}.
309+
* @param resolvedClsLdr Resolved classloader.
303310
*/
304-
public CoreMessagesProvider(Marshaller marsh, ClassLoader clsLdr) {
305-
this.marsh = marsh;
306-
this.clsLdr = clsLdr;
311+
public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller schemaLessMarshaller, ClassLoader resolvedClsLdr) {
312+
this.schemaAwareMarhaller = schemaAwareMarhaller;
313+
this.schemaLessMarshaller = schemaLessMarshaller;
314+
this.resolvedClsLdr = resolvedClsLdr;
307315
}
308316

309317
/** {@inheritDoc} */
@@ -632,8 +640,13 @@ public CoreMessagesProvider(Marshaller marsh, ClassLoader clsLdr) {
632640
register(WalStateAckMessage.class);
633641
}
634642

635-
/** Registers message incrementing {@link #msgIdx}. */
643+
/** */
636644
private <T extends Message> void register(Class<T> cls) {
645+
register(cls, schemaLessMarshaller, U.gridClassLoader());
646+
}
647+
648+
/** Registers message using ane incrementing {@link #msgIdx} as the message id/type. */
649+
private <T extends Message> void register(Class<T> cls, Marshaller marsh, ClassLoader clsLdr) {
637650
Constructor<T> ctor;
638651
MessageSerializer<T> serializer;
639652

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1320,7 +1320,8 @@ private void initMessageFactory() throws IgniteCheckedException {
13201320

13211321
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
13221322

1323-
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), U.resolveClassLoader(ctx.config())));
1323+
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(),
1324+
U.resolveClassLoader(ctx.config())));
13241325

13251326
for (IgniteComponentType compType : IgniteComponentType.values()) {
13261327
MessageFactoryProvider f = compType.messageFactory();

modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class DirectMarshallingMessagesTest extends GridCommonAbstractTest {
4646
/** Message factory. */
4747
private final MessageFactory msgFactory =
4848
new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
49-
new CoreMessagesProvider(jdk(), U.gridClassLoader()),
49+
new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
5050
factory -> factory.register(
5151
TestNestedContainersMessage.TYPE,
5252
TestNestedContainersMessage::new,

modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class CompressedMessageTest {
4747
@Test
4848
public void testWriteReadHugeMessage() {
4949
MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{
50-
new CoreMessagesProvider(jdk(), U.gridClassLoader())});
50+
new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
5151

5252
DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
5353

modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoCommunicationMessageSerializationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
public class IgniteIoCommunicationMessageSerializationTest extends AbstractMessageSerializationTest {
3535
/** {@inheritDoc} */
3636
@Override protected MessageFactoryProvider messageFactory() {
37-
return new CoreMessagesProvider(jdk(), U.gridClassLoader());
37+
return new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader());
3838
}
3939

4040
/** {@inheritDoc} */

modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@
2828
public class IgniteDiscoveryMessageSerializationTest extends AbstractMessageSerializationTest {
2929
/** {@inheritDoc} */
3030
@Override protected MessageFactoryProvider messageFactory() {
31-
return new CoreMessagesProvider(jdk(), U.gridClassLoader());
31+
return new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader());
3232
}
3333
}

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void testCacheContinuousQueryEntrySerialization() {
150150
e0.markFiltered();
151151

152152
IgniteMessageFactoryImpl msgFactory =
153-
new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), U.gridClassLoader())});
153+
new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
154154

155155
ByteBuffer buf = ByteBuffer.allocate(4096);
156156
DirectMessageWriter writer = new DirectMessageWriter(msgFactory);

modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private void startSpis() throws Exception {
154154
GridSpiTestContext ctx = initSpiContext();
155155

156156
ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
157-
new CoreMessagesProvider(jdk(), U.gridClassLoader()), customMessageFactory()}));
157+
new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), customMessageFactory()}));
158158

159159
ctx.setLocalNode(node);
160160

modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ private void startSpis(MessageListener lsnr) throws Exception {
434434
GridSpiTestContext ctx = initSpiContext();
435435

436436
ctx.messageFactory(new IgniteMessageFactoryImpl(
437-
new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
437+
new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
438438
);
439439

440440
ctx.setLocalNode(node);

modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx,
252252
node.setId(rsrcs.getNodeId());
253253

254254
ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{
255-
new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}));
255+
new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}));
256256

257257
ctx.setLocalNode(node);
258258

0 commit comments

Comments
 (0)