Skip to content

Commit d4f8350

Browse files
authored
IGNITE-28541 Use MarshallableMessage for the compute messages (#13024)
1 parent c1ec826 commit d4f8350

5 files changed

Lines changed: 20 additions & 62 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@
2323
import org.apache.ignite.internal.util.typedef.internal.S;
2424
import org.apache.ignite.internal.util.typedef.internal.U;
2525
import org.apache.ignite.marshaller.Marshaller;
26-
import org.apache.ignite.plugin.extensions.communication.Message;
2726
import org.jetbrains.annotations.Nullable;
2827

2928
/**
3029
* Job siblings response.
3130
*/
32-
public class GridJobSiblingsResponse implements Message {
31+
public class GridJobSiblingsResponse implements MarshallableMessage {
3332
/** */
3433
private @Nullable Collection<ComputeJobSibling> siblings;
3534

@@ -58,23 +57,14 @@ public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling> siblings)
5857
return siblings;
5958
}
6059

61-
/**
62-
* Marshals siblings to byte array.
63-
*
64-
* @param marsh Marshaller.
65-
* @throws IgniteCheckedException In case of error.
66-
*/
67-
public void marshalSiblings(Marshaller marsh) throws IgniteCheckedException {
68-
siblingsBytes = U.marshal(marsh, siblings);
60+
/** {@inheritDoc} */
61+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
62+
if (siblings != null)
63+
siblingsBytes = U.marshal(marsh, siblings);
6964
}
7065

71-
/**
72-
* Unmarshals siblings from byte array.
73-
*
74-
* @param marsh Marshaller.
75-
* @throws IgniteCheckedException In case of error.
76-
*/
77-
public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
66+
/** {@inheritDoc} */
67+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
7868
assert marsh != null;
7969

8070
if (siblingsBytes != null) {
@@ -84,7 +74,6 @@ public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
8474
}
8575
}
8676

87-
8877
/** {@inheritDoc} */
8978
@Override public String toString() {
9079
return S.toString(GridJobSiblingsResponse.class, this);

modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -655,17 +655,6 @@ else if (!nodeId.equals(taskNodeId))
655655
else {
656656
// Sender and message type are fine.
657657
res = (GridJobSiblingsResponse)msg;
658-
659-
if (res.jobSiblings() == null) {
660-
try {
661-
res.unmarshalSiblings(marsh);
662-
}
663-
catch (IgniteCheckedException e) {
664-
U.error(log, "Failed to unmarshal job siblings.", e);
665-
666-
err = e.getMessage();
667-
}
668-
}
669658
}
670659

671660
lock.lock();

modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public GridTaskCommandHandler(final GridKernalContext ctx) {
133133
if (err != null)
134134
res.error(err.getMessage());
135135
else
136-
res.marshalResult(ctx, desc.result());
136+
res.res = desc.result();
137137
}
138138
else
139139
res.found(false);
@@ -430,13 +430,6 @@ else if (!nodeId.equals(resHolderId))
430430
// Sender and message type are fine.
431431
res = (GridTaskResultResponse)msg;
432432

433-
try {
434-
res.unmarshalResult(ctx);
435-
}
436-
catch (IgniteCheckedException e) {
437-
U.error(log, "Failed to unmarshal task result: " + res, e);
438-
}
439-
440433
lock.lock();
441434

442435
try {

modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@
1818
package org.apache.ignite.internal.processors.rest.handlers.task;
1919

2020
import org.apache.ignite.IgniteCheckedException;
21-
import org.apache.ignite.internal.GridKernalContext;
21+
import org.apache.ignite.internal.MarshallableMessage;
2222
import org.apache.ignite.internal.Order;
2323
import org.apache.ignite.internal.util.typedef.internal.U;
24-
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.apache.ignite.marshaller.Marshaller;
2525
import org.jetbrains.annotations.Nullable;
2626

2727
/**
2828
* Task result response.
2929
*/
30-
public class GridTaskResultResponse implements Message {
30+
public class GridTaskResultResponse implements MarshallableMessage {
3131
/** Result. */
32-
private @Nullable Object res;
32+
public @Nullable Object res;
3333

3434
/** Serialized result. */
3535
@Order(0)
36-
byte[] resBytes;
36+
@Nullable byte[] resBytes;
3737

3838
/** Finished flag. */
3939
@Order(1)
@@ -96,24 +96,16 @@ public void error(String err) {
9696
this.err = err;
9797
}
9898

99-
/**
100-
* Marshals task result to byte array.
101-
*
102-
* @param ctx Context.
103-
* @param res Task result.
104-
*/
105-
public void marshalResult(GridKernalContext ctx, @Nullable Object res) throws IgniteCheckedException {
106-
resBytes = U.marshal(ctx, res);
99+
/** {@inheritDoc} */
100+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
101+
if (res != null)
102+
resBytes = U.marshal(marsh, res);
107103
}
108104

109-
/**
110-
* Unmarshals task result from byte array.
111-
*
112-
* @param ctx Context.
113-
*/
114-
public void unmarshalResult(GridKernalContext ctx) throws IgniteCheckedException {
105+
/** {@inheritDoc} */
106+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
115107
if (resBytes != null) {
116-
res = U.unmarshal(ctx, resBytes, U.resolveClassLoader(ctx.config()));
108+
res = U.unmarshal(marsh, resBytes, clsLdr);
117109

118110
// It is not required anymore.
119111
resBytes = null;

modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,13 +1418,8 @@ private class JobSiblingsMessageListener implements GridMessageListener {
14181418
try {
14191419
Object topic = TOPIC_JOB_SIBLINGS.topic(req.sessionId(), req.topicId());
14201420

1421-
boolean loc = ctx.localNodeId().equals(nodeId);
1422-
14231421
GridJobSiblingsResponse resp = new GridJobSiblingsResponse(siblings);
14241422

1425-
if (!loc)
1426-
resp.marshalSiblings(marsh);
1427-
14281423
ctx.io().sendToCustomTopic(nodeId, topic, resp, SYSTEM_POOL);
14291424
}
14301425
catch (IgniteCheckedException e) {

0 commit comments

Comments
 (0)