Skip to content

Commit 8651672

Browse files
committed
tx manager
1 parent da2af74 commit 8651672

1 file changed

Lines changed: 7 additions & 11 deletions

File tree

  • modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.function.BiFunction;
3939
import java.util.function.Consumer;
4040
import java.util.function.Predicate;
41+
import java.util.function.Supplier;
4142
import org.apache.ignite.IgniteCheckedException;
4243
import org.apache.ignite.IgniteClientDisconnectedException;
4344
import org.apache.ignite.IgniteException;
@@ -3140,7 +3141,6 @@ private TxRecoveryInitRunnable(ClusterNode node) {
31403141

31413142
for (final IgniteInternalTx tx : activeTransactions()) {
31423143
Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
3143-
boolean localInMaster = tx.masterNodeIds().contains(cctx.localNodeId());
31443144

31453145
if (tx.storeWriteThrough() && txNodes != null
31463146
&& tx.near() && txNodes.containsKey(evtNodeId)
@@ -3150,17 +3150,13 @@ private TxRecoveryInitRunnable(ClusterNode node) {
31503150
sendTxSalvage(tx, evtNodeId);
31513151
}
31523152

3153-
if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId)) {
3154-
if (localInMaster || tx.eventNodeId().equals(evtNodeId))
3155-
salvageTx(tx, RECOVERY_FINISH);
3156-
}
3157-
else if (tx.storeWriteThrough() && !localInMaster
3158-
&& tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED) {
3159-
boolean fullSyncedOp = tx.writeEntries().stream().map(e ->
3160-
cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit);
3153+
Supplier<Boolean> fullSyncedOp = () -> tx.writeEntries().stream().map(e ->
3154+
cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit);
31613155

3162-
if (!fullSyncedOp)
3163-
salvageTx(tx, RECOVERY_FINISH);
3156+
if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId) && tx.eventNodeId().equals(evtNodeId))
3157+
salvageTx(tx, RECOVERY_FINISH);
3158+
else if (tx.storeWriteThrough() && !tx.masterNodeIds().contains(cctx.localNodeId())
3159+
&& tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED && fullSyncedOp.get()) {
31643160
// Delay a commit, on backup. It will be raised further after near or coord. node will confirm it.
31653161
}
31663162
else if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId))

0 commit comments

Comments
 (0)