Skip to content

Commit b0f86ae

Browse files
committed
IGNITE-17656 Fix race condition between concurrent updates of remaining, singleMsgs collections in DistributedProcess. (#10241)
1 parent cd6baa1 commit b0f86ae

1 file changed

Lines changed: 10 additions & 18 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -217,20 +217,15 @@ public DistributedProcess(
217217
p.resFut.listen(f -> sendSingleMessage(p));
218218
}
219219
else if (F.eq(ctx.localNodeId(), p.crdId)) {
220-
boolean rmvd, isEmpty;
220+
boolean isEmpty = false;
221221

222222
synchronized (mux) {
223-
rmvd = p.remaining.remove(leftNodeId);
224-
225-
isEmpty = p.remaining.isEmpty();
223+
if (p.remaining.remove(leftNodeId))
224+
isEmpty = p.remaining.isEmpty();
226225
}
227226

228-
if (rmvd) {
229-
assert !p.singleMsgs.containsKey(leftNodeId);
230-
231-
if (isEmpty)
232-
finishProcess(p);
233-
}
227+
if (isEmpty)
228+
finishProcess(p);
234229
}
235230
});
236231
}
@@ -316,20 +311,17 @@ private void onSingleNodeMessageReceived(SingleNodeMessage<R> msg, UUID nodeId)
316311
Process p = processes.computeIfAbsent(msg.processId(), id -> new Process(msg.processId()));
317312

318313
p.initCrdFut.listen(f -> {
319-
boolean rmvd, isEmpty;
314+
boolean isEmpty;
320315

321316
synchronized (mux) {
322-
rmvd = p.remaining.remove(nodeId);
317+
if (p.remaining.remove(nodeId))
318+
p.singleMsgs.put(nodeId, msg);
323319

324320
isEmpty = p.remaining.isEmpty();
325321
}
326322

327-
if (rmvd) {
328-
p.singleMsgs.put(nodeId, msg);
329-
330-
if (isEmpty)
331-
finishProcess(p);
332-
}
323+
if (isEmpty)
324+
finishProcess(p);
333325
});
334326
}
335327

0 commit comments

Comments
 (0)