Skip to content

Commit 00842a7

Browse files
committed
IGNITE-16136 Fix deadlock on system thread pool during marshaller mapping and binary metadata requests (#10204)
1 parent 150f2c2 commit 00842a7

15 files changed

Lines changed: 604 additions & 133 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,13 @@ public MappedName onMappingProposed(MarshallerMappingItem item) {
363363
public void onMappingAccepted(final MarshallerMappingItem item) {
364364
ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
365365

366-
cache.replace(item.typeId(), new MappedName(item.className(), true));
366+
MappedName oldMappedName = cache.put(item.typeId(), new MappedName(item.className(), true));
367+
368+
assert oldMappedName == null || item.className().equals(oldMappedName.className()) :
369+
"Class name resolved from cluster: "
370+
+ item.className()
371+
+ ", class name from local cache: "
372+
+ oldMappedName.className();
367373

368374
closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className()));
369375
}
@@ -498,30 +504,6 @@ public String resolveMissedMapping(byte platformId, int typeId) {
498504
return null;
499505
}
500506

501-
/**
502-
* @param item Item.
503-
* @param resolvedClsName Resolved class name.
504-
*/
505-
public void onMissedMappingResolved(final MarshallerMappingItem item, String resolvedClsName) {
506-
ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
507-
508-
int typeId = item.typeId();
509-
MappedName mappedName = cache.get(typeId);
510-
511-
if (mappedName != null)
512-
assert resolvedClsName.equals(mappedName.className()) :
513-
"Class name resolved from cluster: "
514-
+ resolvedClsName
515-
+ ", class name from local cache: "
516-
+ mappedName.className();
517-
else {
518-
mappedName = new MappedName(resolvedClsName, true);
519-
cache.putIfAbsent(typeId, mappedName);
520-
521-
closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), resolvedClsName));
522-
}
523-
}
524-
525507
/** {@inheritDoc} */
526508
@Override public boolean isSystemType(String typeName) {
527509
return sysTypesSet.contains(typeName);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java

Lines changed: 38 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.ignite.internal.util.typedef.internal.U;
5454
import org.apache.ignite.lang.IgniteInClosure;
5555
import org.jetbrains.annotations.Nullable;
56-
5756
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
5857
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
5958
import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
@@ -354,13 +353,15 @@ GridFutureAdapter<?> awaitSchemaUpdate(int typeId, int schemaId) {
354353
}
355354

356355
/**
357-
* Allows client node to request latest version of binary metadata for a given typeId from the cluster in case
356+
* Allows client node to request the latest version of binary metadata for a given typeId from the cluster in case
358357
* client is able to detect that it has obsolete metadata in its local cache.
359358
*
360359
* @param typeId ID of binary type.
361360
* @return future to wait for request arrival on.
362361
*/
363362
GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) {
363+
assert ctx.clientNode();
364+
364365
ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap);
365366

366367
ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut);
@@ -604,35 +605,12 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
604605
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
605606
else {
606607
if (clientNode) {
607-
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer);
608-
609-
holder = metaLocCache.putIfAbsent(typeId, newHolder);
610-
611-
if (holder != null) {
612-
boolean obsoleteUpd = false;
608+
boolean success = casBinaryMetadata(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer));
613609

614-
do {
615-
holder = metaLocCache.get(typeId);
616-
617-
if (obsoleteUpdate(
618-
holder.pendingVersion(),
619-
holder.acceptedVersion(),
620-
pendingVer,
621-
acceptedVer)) {
622-
obsoleteUpd = true;
623-
624-
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
625-
626-
break;
627-
}
628-
}
629-
while (!metaLocCache.replace(typeId, holder, newHolder));
630-
631-
if (!obsoleteUpd)
632-
initSyncFor(typeId, pendingVer, fut);
633-
}
634-
else
610+
if (success)
635611
initSyncFor(typeId, pendingVer, fut);
612+
else
613+
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
636614
}
637615
else {
638616
initSyncFor(typeId, pendingVer, fut);
@@ -659,24 +637,8 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
659637

660638
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
661639

662-
if (clientNode) {
663-
holder = metaLocCache.putIfAbsent(typeId, newHolder);
664-
665-
if (holder != null) {
666-
do {
667-
holder = metaLocCache.get(typeId);
668-
669-
if (obsoleteUpdate(
670-
holder.pendingVersion(),
671-
holder.acceptedVersion(),
672-
pendingVer,
673-
acceptedVer))
674-
break;
675-
676-
}
677-
while (!metaLocCache.replace(typeId, holder, newHolder));
678-
}
679-
}
640+
if (clientNode)
641+
casBinaryMetadata(typeId, newHolder);
680642
else {
681643
if (log.isDebugEnabled())
682644
log.debug("Updated metadata on server node [holder=" + newHolder +
@@ -717,18 +679,13 @@ private final class MetadataUpdateAcceptedListener implements CustomEventListene
717679
int newAcceptedVer = msg.acceptedVersion();
718680

719681
if (clientNode) {
720-
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
721-
holder.pendingVersion(), newAcceptedVer);
722-
723-
do {
724-
holder = metaLocCache.get(typeId);
682+
boolean success = casBinaryMetadata(typeId,
683+
new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
725684

726-
int oldAcceptedVer = holder.acceptedVersion();
685+
ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
727686

728-
if (oldAcceptedVer > newAcceptedVer)
729-
break;
730-
}
731-
while (!metaLocCache.replace(typeId, holder, newHolder));
687+
if (success && fut != null)
688+
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
732689
}
733690
else {
734691
int oldAcceptedVer = holder.acceptedVersion();
@@ -783,6 +740,29 @@ private final class MetadataUpdateAcceptedListener implements CustomEventListene
783740
}
784741
}
785742

743+
/**
744+
* @param typeId Type id.
745+
* @param newHolder New binary metadata holder.
746+
* @return {@code true} if new holder was added successfully.
747+
*/
748+
private boolean casBinaryMetadata(int typeId, BinaryMetadataHolder newHolder) {
749+
BinaryMetadataHolder oldHolder;
750+
751+
do {
752+
oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
753+
754+
if (oldHolder == null)
755+
return true;
756+
757+
if (obsoleteUpdate(oldHolder.pendingVersion(), oldHolder.acceptedVersion(),
758+
newHolder.pendingVersion(), newHolder.acceptedVersion()))
759+
return false;
760+
}
761+
while (!metaLocCache.replace(typeId, oldHolder, newHolder));
762+
763+
return true;
764+
}
765+
786766
/**
787767
* Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving
788768
* {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
@@ -915,24 +895,7 @@ private final class MetadataResponseListener implements GridMessageListener {
915895
}
916896

917897
try {
918-
BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()));
919-
920-
BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
921-
922-
if (oldHolder != null) {
923-
do {
924-
oldHolder = metaLocCache.get(typeId);
925-
926-
// typeId metadata cannot be removed after initialization.
927-
if (obsoleteUpdate(
928-
oldHolder.pendingVersion(),
929-
oldHolder.acceptedVersion(),
930-
newHolder.pendingVersion(),
931-
newHolder.acceptedVersion()))
932-
break;
933-
}
934-
while (!metaLocCache.replace(typeId, oldHolder, newHolder));
935-
}
898+
casBinaryMetadata(typeId, U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config())));
936899

937900
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
938901
}

modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ void requestMapping() {
9797

9898
try {
9999
ioMgr.sendToGridTopic(
100-
srvNode,
101-
GridTopic.TOPIC_MAPPING_MARSH,
102-
new MissingMappingRequestMessage(
103-
item.platformId(),
104-
item.typeId()),
105-
GridIoPolicy.SYSTEM_POOL);
100+
srvNode,
101+
GridTopic.TOPIC_MAPPING_MARSH,
102+
new MissingMappingRequestMessage(
103+
item.platformId(),
104+
item.typeId()),
105+
GridIoPolicy.SYSTEM_POOL);
106106

107107
if (discoMgr.node(srvNode.id()) == null)
108108
continue;
@@ -113,21 +113,22 @@ void requestMapping() {
113113
}
114114
catch (IgniteCheckedException ignored) {
115115
U.warn(log,
116-
"Failed to request marshaller mapping from remote node (proceeding with the next one): "
117-
+ srvNode);
116+
"Failed to request marshaller mapping from remote node (proceeding with the next one): "
117+
+ srvNode);
118118
}
119119
}
120120

121121
noSrvsInCluster = pendingNode == null;
122122
}
123123

124-
if (noSrvsInCluster)
124+
if (noSrvsInCluster) {
125125
onDone(MappingExchangeResult.createFailureResult(
126-
new IgniteCheckedException(
127-
"All server nodes have left grid, cannot request mapping [platformId: "
128-
+ item.platformId()
129-
+ "; typeId: "
130-
+ item.typeId() + "]")));
126+
new IgniteCheckedException(
127+
"All server nodes have left grid, cannot request mapping [platformId: "
128+
+ item.platformId()
129+
+ "; typeId: "
130+
+ item.typeId() + "]")));
131+
}
131132
}
132133

133134
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private final class MissingMappingResponseListener implements GridMessageListene
216216

217217
if (fut != null) {
218218
if (resolvedClsName != null) {
219-
marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
219+
marshallerCtx.onMappingAccepted(new MarshallerMappingItem(platformId, typeId, resolvedClsName));
220220

221221
fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
222222
}
@@ -312,6 +312,11 @@ private final class MappingAcceptedListener implements CustomEventListener<Mappi
312312
}
313313
});
314314

315+
ClientRequestFuture rqFut = clientReqSyncMap.get(new MarshallerMappingItem(item.platformId(), item.typeId(), null));
316+
317+
if (rqFut != null)
318+
rqFut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
319+
315320
GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
316321

317322
if (fut != null)

modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.marshaller;
1919

2020
import java.io.Serializable;
21+
import java.util.Objects;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
2223

2324
/**
@@ -56,6 +57,24 @@ public boolean accepted() {
5657
return accepted;
5758
}
5859

60+
/** {@inheritDoc} */
61+
@Override public boolean equals(Object o) {
62+
if (this == o)
63+
return true;
64+
65+
if (o == null || getClass() != o.getClass())
66+
return false;
67+
68+
MappedName name = (MappedName)o;
69+
70+
return accepted == name.accepted && Objects.equals(clsName, name.clsName);
71+
}
72+
73+
/** {@inheritDoc} */
74+
@Override public int hashCode() {
75+
return Objects.hash(clsName, accepted);
76+
}
77+
5978
/** {@inheritDoc} */
6079
@Override public String toString() {
6180
return S.toString(MappedName.class, this);

modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.marshaller;
1919

2020
import java.io.Serializable;
21+
import java.util.Objects;
2122
import org.jetbrains.annotations.Nullable;
2223

2324
/**
@@ -84,7 +85,7 @@ public void className(String clsName) {
8485

8586
return platformId == that.platformId
8687
&& typeId == that.typeId
87-
&& (clsName != null ? clsName.equals(that.clsName) : that.clsName == null);
88+
&& (Objects.equals(clsName, that.clsName));
8889
}
8990

9091
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,27 +135,28 @@ else if (stopping)
135135
* @param cache Cache.
136136
*/
137137
public GridFutureAdapter<MappingExchangeResult> requestMapping(
138-
MarshallerMappingItem item,
139-
ConcurrentMap<Integer, MappedName> cache
138+
MarshallerMappingItem item,
139+
ConcurrentMap<Integer, MappedName> cache
140140
) {
141-
ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
141+
assert ctx.clientNode();
142+
assert item.className() == null;
142143

144+
ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
143145
ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
144-
145-
if (oldFut != null)
146-
return oldFut;
146+
ClientRequestFuture resFut = oldFut == null ? newFut : oldFut;
147147

148148
MappedName mappedName = cache.get(item.typeId());
149149

150-
if (mappedName != null) {
151-
newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
152-
153-
return newFut;
150+
if (mappedName == null) {
151+
if (oldFut == null)
152+
newFut.requestMapping();
153+
}
154+
else {
155+
if (mappedName.accepted())
156+
resFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
154157
}
155158

156-
newFut.requestMapping();
157-
158-
return newFut;
159+
return resFut;
159160
}
160161

161162
/**

0 commit comments

Comments
 (0)