Skip to content

Commit 14e9498

Browse files
authored
IGNITE-28334 Replace CalciteMarshalableMessage with MarshallableMessage (#13022)
1 parent f902ab4 commit 14e9498

45 files changed

Lines changed: 318 additions & 490 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@
3232
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
3333
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
3434
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
35-
import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
3635
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
3736
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
3837
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
3938
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
4039
import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
40+
import org.apache.ignite.internal.processors.query.calcite.message.QueryInboxCloseMessage;
4141
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
4242
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
4343
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -165,7 +165,7 @@ public void queryRegistry(QueryRegistry qryRegistry) {
165165

166166
/** {@inheritDoc} */
167167
@Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException {
168-
messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, exchangeId));
168+
messageService().send(nodeId, new QueryInboxCloseMessage(qryId, fragmentId, exchangeId));
169169
}
170170

171171
/** {@inheritDoc} */
@@ -188,8 +188,8 @@ public void queryRegistry(QueryRegistry qryRegistry) {
188188

189189
/** {@inheritDoc} */
190190
@Override public void init() {
191-
messageService().register((n, m) -> onMessage(n, (InboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
192-
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
191+
messageService().register((n, m) -> onMessage(n, (QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
192+
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
193193
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
194194
messageService().register((n, m) -> onMessage(n, (QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
195195
}
@@ -221,7 +221,7 @@ public void queryRegistry(QueryRegistry qryRegistry) {
221221
}
222222

223223
/** */
224-
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
224+
protected void onMessage(UUID nodeId, QueryInboxCloseMessage msg) {
225225
Collection<Inbox<?>> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
226226

227227
if (!F.isEmpty(inboxes)) {

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java renamed to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919

2020
import org.apache.ignite.IgniteCheckedException;
2121
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
2223

23-
/**
24-
*
25-
*/
26-
public interface CalciteMarshalableMessage extends CalciteMessage {
24+
/** A Calcite engine related message which requires marshalling with context. */
25+
public interface CalciteContextMarshallableMessage extends Message {
2726
/**
2827
* Prepares the message before sending.
2928
*
@@ -35,6 +34,7 @@ public interface CalciteMarshalableMessage extends CalciteMessage {
3534
* Prepares the message before processing.
3635
*
3736
* @param ctx Cache shared context.
37+
* @param clsLdr Class loader.
3838
*/
39-
void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException;
39+
void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader clsLdr) throws IgniteCheckedException;
4040
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,11 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.UUID;
21-
import org.apache.ignite.IgniteCheckedException;
2221
import org.apache.ignite.internal.Order;
23-
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
24-
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
25-
import org.apache.ignite.internal.util.typedef.internal.U;
26-
import org.jetbrains.annotations.Nullable;
22+
import org.apache.ignite.internal.managers.communication.ErrorMessage;
2723

28-
/**
29-
*
30-
*/
31-
public class CalciteErrorMessage implements CalciteMarshalableMessage {
24+
/** */
25+
public class CalciteErrorMessage extends ErrorMessage {
3226
/** */
3327
@Order(0)
3428
UUID qryId;
@@ -37,62 +31,28 @@ public class CalciteErrorMessage implements CalciteMarshalableMessage {
3731
@Order(1)
3832
long fragmentId;
3933

40-
/** Error bytes. */
41-
@Order(2)
42-
@GridToStringExclude
43-
@Nullable public byte[] errBytes;
44-
45-
/** Error. */
46-
private @Nullable Throwable err;
47-
4834
/** */
4935
public CalciteErrorMessage() {
5036
// No-op.
5137
}
5238

5339
/** */
5440
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
41+
super(err);
42+
5543
assert err != null;
5644

5745
this.qryId = qryId;
5846
this.fragmentId = fragmentId;
59-
this.err = err;
6047
}
6148

62-
/**
63-
* @return Query ID.
64-
*/
49+
/** @return Query ID. */
6550
public UUID queryId() {
6651
return qryId;
6752
}
6853

69-
/**
70-
* @return Fragment ID.
71-
*/
54+
/** @return Fragment ID. */
7255
public long fragmentId() {
7356
return fragmentId;
7457
}
75-
76-
/** */
77-
public @Nullable Throwable error() {
78-
return err;
79-
}
80-
81-
/** {@inheritDoc} */
82-
@Override public MessageType type() {
83-
return MessageType.QUERY_ERROR_MESSAGE;
84-
}
85-
86-
87-
/** {@inheritDoc} */
88-
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
89-
if (err != null)
90-
errBytes = U.marshal(ctx.marshaller(), err);
91-
}
92-
93-
/** {@inheritDoc} */
94-
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
95-
if (errBytes != null)
96-
err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig()));
97-
}
9858
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,21 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import java.util.function.Supplier;
20+
import org.apache.ignite.internal.MarshallableMessage;
21+
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
2122
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
22-
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2323

2424
/**
2525
* Message factory.
2626
*/
27-
public class CalciteMessageFactory implements MessageFactoryProvider {
27+
public class CalciteMessageFactory extends AbstractMarshallableMessageFactoryProvider {
2828
/** {@inheritDoc} */
29-
@SuppressWarnings({"unchecked", "rawtypes"})
3029
@Override public void registerAll(MessageFactory factory) {
31-
for (MessageType type : MessageType.values())
32-
factory.register(type.directType(), (Supplier)type.factory(), type.serializer());
33-
}
34-
35-
/**
36-
* Produces a value message.
37-
*/
38-
public static ValueMessage asMessage(Object val) {
39-
if (val == null)
40-
return null;
41-
42-
return new GenericValueMessage(val);
30+
for (MessageType type : MessageType.values()) {
31+
if (MarshallableMessage.class.isAssignableFrom(type.messageClass()))
32+
register(factory, type.messageClass(), type.directType(), schemaAwareMarsh, resolvedClsLdr);
33+
else
34+
register(factory, type.messageClass(), type.directType(), dfltMarsh, dftlClsLdr);
35+
}
4336
}
4437
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.plugin.extensions.communication.Message;
2122

2223
/**
2324
* Execution context is used to determine a stripe where to process a message.
2425
*/
25-
public interface ExecutionContextAware extends CalciteMessage {
26+
public interface ExecutionContextAware extends Message {
2627
/**
2728
* @return Query ID.
2829
*/

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.internal.MarshallableMessage;
2122
import org.apache.ignite.internal.Order;
22-
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
2323
import org.apache.ignite.internal.util.typedef.internal.U;
24+
import org.apache.ignite.marshaller.Marshaller;
2425

25-
/**
26-
*
27-
*/
28-
public final class GenericValueMessage implements ValueMessage {
26+
/** */
27+
public final class GenericValueMessage implements MarshallableMessage {
2928
/** */
3029
private Object val;
3130

@@ -35,33 +34,30 @@ public final class GenericValueMessage implements ValueMessage {
3534

3635
/** */
3736
public GenericValueMessage() {
38-
37+
// No-op.
3938
}
4039

4140
/** */
4241
public GenericValueMessage(Object val) {
4342
this.val = val;
4443
}
4544

46-
/** {@inheritDoc} */
47-
@Override public Object value() {
45+
/** */
46+
public Object value() {
4847
return val;
4948
}
5049

5150
/** {@inheritDoc} */
52-
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
51+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
5352
if (val != null && serialized == null)
54-
serialized = U.marshal(ctx, val);
53+
serialized = U.marshal(marsh, val);
5554
}
5655

5756
/** {@inheritDoc} */
58-
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
57+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
5958
if (serialized != null && val == null)
60-
val = U.unmarshal(ctx, serialized, U.resolveClassLoader(ctx.gridConfig()));
61-
}
59+
val = U.unmarshal(marsh, serialized, clsLdr);
6260

63-
/** {@inheritDoc} */
64-
@Override public MessageType type() {
65-
return MessageType.GENERIC_VALUE_MESSAGE;
61+
serialized = null;
6662
}
6763
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.plugin.extensions.communication.Message;
2122

2223
/**
2324
*
@@ -27,5 +28,5 @@ public interface MessageListener {
2728
* @param nodeId Sender node ID.
2829
* @param msg Message.
2930
*/
30-
void onMessage(UUID nodeId, CalciteMessage msg);
31+
void onMessage(UUID nodeId, Message msg);
3132
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.UUID;
2121
import org.apache.ignite.IgniteCheckedException;
2222
import org.apache.ignite.internal.processors.query.calcite.util.Service;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
2324

2425
/**
2526
*
@@ -31,7 +32,7 @@ public interface MessageService extends Service {
3132
* @param nodeId Node ID.
3233
* @param msg Message.
3334
*/
34-
void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
35+
void send(UUID nodeId, Message msg) throws IgniteCheckedException;
3536

3637
/**
3738
* Checks whether a node with given ID is alive.

0 commit comments

Comments
 (0)