Skip to content

Commit baf3a40

Browse files
vldpyatkovMmuzaf
authored andcommitted
IGNITE-14138 Fixed an issue where historical rebalance can kill supplier node. Fixes #8769
Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
1 parent 843ea8e commit baf3a40

9 files changed

Lines changed: 282 additions & 43 deletions

File tree

modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.configuration.IgniteConfiguration;
3535
import org.apache.ignite.internal.client.GridClient;
3636
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
37+
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
3738
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
3839
import org.apache.ignite.internal.processors.metric.GridMetricManager;
3940
import org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
@@ -1890,6 +1891,14 @@ public final class IgniteSystemProperties {
18901891
public static final String IGNITE_MASTER_KEY_NAME_TO_CHANGE_BEFORE_STARTUP =
18911892
"IGNITE_MASTER_KEY_NAME_TO_CHANGE_BEFORE_STARTUP";
18921893

1894+
/**
1895+
* Disable group state lazy store. It means that group state won't be cached for {@link CheckpointEntry} and will be
1896+
* read from wal every time. Should be used for test purposes only.
1897+
*/
1898+
@SystemProperty(value = "Disable group state lazy store. It means that group state won't be cached " +
1899+
"and will be read from wal every time", defaults = "false")
1900+
public static final String IGNITE_DISABLE_GRP_STATE_LAZY_STORE = "IGNITE_DISABLE_GRP_STATE_LAZY_STORE";
1901+
18931902
/**
18941903
* Enables extended logging of indexes create/rebuild process. Default {@code false}.
18951904
* <p/>

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,10 @@ else if (grp.preloader().throttle() > 0)
618618
* @param demandMsg Demand message.
619619
*/
620620
private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) {
621-
return "grp=" + grp.cacheOrGroupName() + ", demander=" + demander + ", topVer=" + demandMsg.topologyVersion() + ", topic=" + topicId;
621+
return "grp=" + grp.cacheOrGroupName() +
622+
", demander=" + demander +
623+
", topVer=" + demandMsg.topologyVersion() +
624+
(topicId > 0 ? ", topic=" + topicId : "");
622625
}
623626

624627
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,10 +1596,17 @@ private String cacheInfo(GridCacheContext cacheCtx) {
15961596
assert cctx.wal().reserved(cpEntry.checkpointMark())
15971597
: "WAL segment for checkpoint " + cpEntry + " has not reserved";
15981598

1599-
Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
1599+
try {
1600+
Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
16001601

1601-
if (updCntr != null)
1602-
grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
1602+
if (updCntr != null)
1603+
grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
1604+
}
1605+
catch (IgniteCheckedException ex) {
1606+
log.warning("Reservation failed because counters are not available [grpId=" + grpId
1607+
+ ", part=" + partId
1608+
+ ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", ex);
1609+
}
16031610
}
16041611
}
16051612

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,11 +1156,6 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
11561156

11571157
GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database();
11581158

1159-
WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
1160-
1161-
if (latestReservedPointer == null)
1162-
throw new IgniteHistoricalIteratorException("Historical iterator wasn't created, because WAL isn't reserved.");
1163-
11641159
Map<Integer, Long> partsCounters = new HashMap<>();
11651160

11661161
for (int i = 0; i < partCntrs.size(); i++) {
@@ -1170,15 +1165,20 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
11701165
partsCounters.put(p, initCntr);
11711166
}
11721167

1173-
WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
1174-
partsCounters, latestReservedPointer, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
1168+
try {
1169+
WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(),
1170+
partsCounters, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L);
1171+
1172+
WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading();
11751173

1176-
assert latestReservedPointer.compareTo(minPtr) <= 0
1177-
: "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
1178-
+ ", reservedPointer=" + database.latestWalPointerReservedForPreloading()
1179-
+ ", historicalPointer=" + minPtr + ']';
1174+
assert latestReservedPointer == null || latestReservedPointer.compareTo(minPtr) <= 0
1175+
: "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName()
1176+
+ ", reservedPointer=" + latestReservedPointer
1177+
+ ", historicalPointer=" + minPtr + ']';
1178+
1179+
if (latestReservedPointer == null)
1180+
log.warning("History for the preloading has not reserved yet.");
11801181

1181-
try {
11821182
WALIterator it = grp.shared().wal().replay(minPtr);
11831183

11841184
WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, partsCounters, it);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2727
import org.apache.ignite.IgniteCheckedException;
28+
import org.apache.ignite.IgniteSystemProperties;
2829
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
2930
import org.apache.ignite.internal.pagemem.wal.WALIterator;
3031
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
@@ -36,6 +37,8 @@
3637
import org.apache.ignite.lang.IgniteBiTuple;
3738
import org.jetbrains.annotations.Nullable;
3839

40+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE;
41+
3942
/**
4043
* Class represents checkpoint state.
4144
*/
@@ -114,7 +117,7 @@ public Map<Integer, GroupState> groupState(
114117
private GroupStateLazyStore initIfNeeded(IgniteWriteAheadLogManager wal) throws IgniteCheckedException {
115118
GroupStateLazyStore store = grpStateLazyStore.get();
116119

117-
if (store == null) {
120+
if (store == null || IgniteSystemProperties.getBoolean(IGNITE_DISABLE_GRP_STATE_LAZY_STORE, false)) {
118121
store = new GroupStateLazyStore();
119122

120123
grpStateLazyStore = new SoftReference<>(store);
@@ -130,16 +133,10 @@ private GroupStateLazyStore initIfNeeded(IgniteWriteAheadLogManager wal) throws
130133
* @param grpId Cache group ID.
131134
* @param part Partition ID.
132135
* @return Partition counter or {@code null} if not found.
136+
* @throws IgniteCheckedException If something is wrong when loading the counter from WAL history.
133137
*/
134-
public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) {
135-
GroupStateLazyStore store;
136-
137-
try {
138-
store = initIfNeeded(wal);
139-
}
140-
catch (IgniteCheckedException e) {
141-
return null;
142-
}
138+
public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) throws IgniteCheckedException {
139+
GroupStateLazyStore store = initIfNeeded(wal);
143140

144141
return store.partitionCounter(grpId, part);
145142
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,14 +439,12 @@ private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() {
439439
*
440440
* @param grpId Group id.
441441
* @param partsCounter Partition mapped to update counter.
442-
* @param latestReservedPointer Latest reserved WAL pointer.
443442
* @param margin Margin pointer.
444443
* @return Earliest WAL pointer for group specified.
445444
*/
446445
@Nullable public WALPointer searchEarliestWalPointer(
447446
int grpId,
448447
Map<Integer, Long> partsCounter,
449-
WALPointer latestReservedPointer,
450448
long margin
451449
) throws IgniteCheckedException {
452450
if (F.isEmpty(partsCounter))
@@ -467,6 +465,12 @@ private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() {
467465

468466
WALPointer ptr = cpEntry.checkpointMark();
469467

468+
if (!wal.reserved(ptr)) {
469+
throw new IgniteCheckedException("WAL pointer appropriate to the checkpoint was not reserved " +
470+
"[cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp())
471+
+ "), ptr=" + ptr + ']');
472+
}
473+
470474
while (iter.hasNext()) {
471475
Map.Entry<Integer, Long> entry = iter.next();
472476

@@ -494,7 +498,7 @@ private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() {
494498
}
495499
}
496500

497-
if ((F.isEmpty(modifiedPartsCounter) && F.isEmpty(historyPointerCandidate)) || ptr.compareTo(latestReservedPointer) == 0)
501+
if (F.isEmpty(modifiedPartsCounter))
498502
break;
499503
}
500504

@@ -588,7 +592,16 @@ public WALPointer choose(
588592
long margin,
589593
Map<Integer, Long> partCntsForUpdate
590594
) {
591-
Long foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
595+
Long foundCntr = null;
596+
597+
try {
598+
foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(wal, grpId, part);
599+
}
600+
catch (IgniteCheckedException e) {
601+
log.warning("Checkpoint cannot be chosen because counter is unavailable [grpId=" + grpId
602+
+ ", part=" + part
603+
+ ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", e);
604+
}
592605

593606
if (foundCntr == null || foundCntr == walPntrCntr) {
594607
partCntsForUpdate.put(part, walPntrCntr);
@@ -639,7 +652,9 @@ public WALPointer choose(
639652
if (F.isEmpty(modifiedSearchMap))
640653
return res;
641654
}
642-
catch (IgniteCheckedException ignore) {
655+
catch (IgniteCheckedException e) {
656+
log.warning("Checkpoint data is unavailable in WAL [cpTs=" + U.format(cpTs) + ']', e);
657+
643658
break;
644659
}
645660
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
19+
20+
import java.io.File;
21+
import org.apache.ignite.IgniteCache;
22+
import org.apache.ignite.IgniteSystemProperties;
23+
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
24+
import org.apache.ignite.cluster.ClusterState;
25+
import org.apache.ignite.configuration.CacheConfiguration;
26+
import org.apache.ignite.configuration.DataRegionConfiguration;
27+
import org.apache.ignite.configuration.DataStorageConfiguration;
28+
import org.apache.ignite.configuration.IgniteConfiguration;
29+
import org.apache.ignite.internal.IgniteEx;
30+
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
31+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
32+
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
33+
import org.apache.ignite.internal.util.typedef.internal.U;
34+
import org.apache.ignite.testframework.ListeningTestLogger;
35+
import org.apache.ignite.testframework.LogListener;
36+
import org.apache.ignite.testframework.junits.WithSystemProperty;
37+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
38+
import org.junit.Test;
39+
40+
/**
41+
* Tests a fall back from historical to full rebalance if WAL had been corrupted after it was reserved.
42+
*/
43+
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
44+
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE, value = "true")
45+
public class CacheRebalanceWithRemovedWalSegment extends GridCommonAbstractTest {
46+
/** Listening logger. */
47+
private ListeningTestLogger listeningLog;
48+
49+
/** {@inheritDoc} */
50+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
51+
return super.getConfiguration(igniteInstanceName)
52+
.setGridLogger(listeningLog)
53+
.setCommunicationSpi(new TestRecordingCommunicationSpi())
54+
.setDataStorageConfiguration(new DataStorageConfiguration()
55+
.setWalSegmentSize(512 * 1024)
56+
.setWalSegments(2)
57+
.setCheckpointFrequency(600_000)
58+
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
59+
.setMaxSize(200L * 1024 * 1024)
60+
.setPersistenceEnabled(true)))
61+
.setConsistentId(igniteInstanceName)
62+
.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
63+
.setBackups(1)
64+
.setAffinity(new RendezvousAffinityFunction(false, 16)));
65+
}
66+
67+
/** {@inheritDoc} */
68+
@Override protected void afterTest() throws Exception {
69+
stopAllGrids();
70+
71+
cleanPersistenceDir();
72+
73+
super.afterTest();
74+
}
75+
76+
/** {@inheritDoc} */
77+
@Override protected void beforeTest() throws Exception {
78+
super.beforeTest();
79+
80+
stopAllGrids();
81+
82+
cleanPersistenceDir();
83+
84+
listeningLog = new ListeningTestLogger(log);
85+
}
86+
87+
/**
88+
* @throws Exception If failed.
89+
*/
90+
@Test
91+
public void test() throws Exception {
92+
IgniteEx ignite = startGrids(2);
93+
94+
ignite.cluster().state(ClusterState.ACTIVE);
95+
96+
IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
97+
98+
byte[] testVal = new byte[20 * 1024];
99+
100+
for (int i = 0; i < 300; i++)
101+
cache.put(i, testVal);
102+
103+
forceCheckpoint();
104+
105+
ignite(1).close();
106+
107+
for (int i = 300; i < 500; i++)
108+
cache.put(i, testVal);
109+
110+
forceCheckpoint();
111+
112+
stopAllGrids();
113+
114+
ignite = startGridWithBlockedDemandMessages(1, 0);
115+
startGrid(0);
116+
117+
ignite.cluster().state(ClusterState.ACTIVE);
118+
119+
GridDhtPartitionsExchangeFuture exchangeFuture = ignite(0).context().cache().context().exchange().lastTopologyFuture();
120+
121+
// Waiting for reservation, otherwise we can catch a problem during reservation.
122+
exchangeFuture.get();
123+
124+
TestRecordingCommunicationSpi.spi(ignite).waitForBlocked();
125+
126+
File walPath = new File(
127+
U.resolveWorkDirectory(
128+
ignite(0).context().config().getWorkDirectory(),
129+
DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
130+
false
131+
),
132+
ignite(0).context().pdsFolderResolver().resolveFolders().folderName()
133+
);
134+
135+
for (File file : walPath.listFiles()) {
136+
if (U.delete(file))
137+
info("File deleted " + file);
138+
else
139+
info("Can't delete file " + file);
140+
}
141+
142+
LogListener lsnr = LogListener.matches("Failed to continue supplying [grp=" + DEFAULT_CACHE_NAME
143+
+ ", demander=" + ignite.localNode().id()
144+
+ ", topVer=" + exchangeFuture.topologyVersion() + ']').build();
145+
146+
listeningLog.registerListener(lsnr);
147+
148+
TestRecordingCommunicationSpi.spi(ignite).stopBlock();
149+
150+
awaitPartitionMapExchange();
151+
152+
assertTrue(lsnr.check());
153+
154+
assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME));
155+
}
156+
157+
/**
158+
* Starts a node and blocks demand message sending to other one.
159+
*
160+
* @param nodeIdx Start node index.
161+
* @param demandNodeIdx Demand node index.
162+
* @return Started node Ignite instance.
163+
* @throws Exception If failed.
164+
*/
165+
private IgniteEx startGridWithBlockedDemandMessages(int nodeIdx, int demandNodeIdx) throws Exception {
166+
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(nodeIdx));
167+
168+
TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
169+
170+
spi.blockMessages(GridDhtPartitionDemandMessage.class, getTestIgniteInstanceName(demandNodeIdx));
171+
172+
return startGrid(cfg);
173+
}
174+
}

0 commit comments

Comments
 (0)