Skip to content

Commit a5fae73

Browse files
committed
IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
(cherry picked from commit 6e6132f)
1 parent ab3b948 commit a5fae73

4 files changed

Lines changed: 118 additions & 19 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2951,7 +2951,7 @@ private byte convertToTxState(TransactionState state) {
29512951
* @param highBound Upper bound.
29522952
* @throws IgniteCheckedException If failed.
29532953
*/
2954-
public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
2954+
@Override public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
29552955
checkpointManager.removeCheckpointsUntil(highBound);
29562956
}
29572957

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,17 +1156,18 @@ public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throw
11561156
if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode())
11571157
return;
11581158

1159-
WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true);
1159+
try (WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true)) {
1160+
while (iter.hasNext())
1161+
iter.next();
11601162

1161-
while (iter.hasNext())
1162-
iter.next();
1163+
WALPointer ptr = iter.lastRead().orElse(null);
11631164

1164-
WALPointer ptr = iter.lastRead().orElse(null);
1165+
if (ptr != null)
1166+
ptr = ptr.next();
11651167

1166-
if (ptr != null)
1167-
ptr = ptr.next();
1168-
1169-
cctx.wal(true).resumeLogging(ptr);
1168+
cctx.wal(true).startAutoReleaseSegments();
1169+
cctx.wal(true).resumeLogging(ptr);
1170+
}
11701171
}
11711172

11721173
/**
@@ -1750,4 +1751,14 @@ private void checkRegionWarmUpConfiguration(
17501751
(warmUpConfig) -> "Unknown data region warm-up configuration: " + errPostfix.get()
17511752
);
17521753
}
1754+
1755+
/**
1756+
* Wal truncate callback.
1757+
*
1758+
* @param highBound Upper bound.
1759+
* @throws IgniteCheckedException If failed.
1760+
*/
1761+
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
1762+
// No-op.
1763+
}
17531764
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.ignite.IgniteCheckedException;
5757
import org.apache.ignite.IgniteLogger;
5858
import org.apache.ignite.IgniteSystemProperties;
59+
import org.apache.ignite.configuration.DataRegionConfiguration;
5960
import org.apache.ignite.configuration.DataStorageConfiguration;
6061
import org.apache.ignite.configuration.DiskPageCompression;
6162
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -81,8 +82,8 @@
8182
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
8283
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
8384
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
85+
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
8486
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
85-
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
8687
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
8788
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
8889
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -340,6 +341,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
340341
/** Positive (non-0) value indicates WAL must be archived even if not complete. */
341342
private final long walForceArchiveTimeout;
342343

344+
/**
345+
* {@code True} if WAL enabled only for CDC.
346+
* This mean {@link DataRegionConfiguration#isPersistenceEnabled()} is {@code false} for all {@link DataRegion},
347+
* and {@link DataRegionConfiguration#isCdcEnabled()} {@code true} for some of them.
348+
*/
349+
private final boolean inMemoryCdc;
350+
343351
/**
344352
* Container with last WAL record logged timestamp.<br> Zero value means there was no records logged to current
345353
* segment, skip possible archiving for this case<br> Value is filled only for case {@link
@@ -423,6 +431,7 @@ public FileWriteAheadLogManager(final GridKernalContext ctx) {
423431
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
424432
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
425433
walForceArchiveTimeout = dsCfg.getWalForceArchiveTimeout();
434+
inMemoryCdc = !CU.isPersistenceEnabled(dsCfg) && CU.isCdcEnabled(igCfg);
426435

427436
timeoutRolloverMux = (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0) ? new Object() : null;
428437

@@ -1374,6 +1383,11 @@ private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) t
13741383
segmentSize.put(idx, currSize);
13751384
}
13761385
finally {
1386+
// Move checkpoint pointer to the edge as node don't have actual checkpoints in `inMemoryCdc=true` mode.
1387+
// This will allow cleaner to remove segments from archive.
1388+
if (inMemoryCdc)
1389+
notchLastCheckpointPtr(hnd.position());
1390+
13771391
if (archiver == null)
13781392
segmentAware.addSize(idx, currSize - reservedSize);
13791393
}
@@ -3294,7 +3308,7 @@ public FileCleaner(IgniteLogger log) {
32943308
+ ", maxSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ']');
32953309
}
32963310

3297-
((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr);
3311+
cctx.database().onWalTruncated(highPtr);
32983312

32993313
int truncated = truncate(highPtr);
33003314

modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.io.File;
2021
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.List;
25+
import java.util.concurrent.ThreadLocalRandom;
2426
import java.util.concurrent.atomic.AtomicInteger;
2527
import java.util.function.Consumer;
28+
import java.util.function.IntConsumer;
2629
import org.apache.ignite.IgniteCache;
2730
import org.apache.ignite.IgniteCheckedException;
2831
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,10 +36,12 @@
3336
import org.apache.ignite.configuration.DataStorageConfiguration;
3437
import org.apache.ignite.configuration.IgniteConfiguration;
3538
import org.apache.ignite.internal.IgniteEx;
39+
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
3640
import org.apache.ignite.internal.pagemem.wal.WALIterator;
3741
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
3842
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
3943
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
44+
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
4045
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
4146
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
4247
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
@@ -53,6 +58,9 @@
5358
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
5459
import static org.apache.ignite.cache.CacheMode.REPLICATED;
5560
import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
61+
import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
62+
import static org.apache.ignite.internal.util.IgniteUtils.KB;
63+
import static org.apache.ignite.internal.util.IgniteUtils.MB;
5664
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
5765
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
5866

@@ -76,6 +84,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
7684
/** */
7785
private boolean cdcEnabled;
7886

87+
/** */
88+
private long archiveSz = UNLIMITED_WAL_ARCHIVE;
89+
7990
/** */
8091
@Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
8192
public static Collection<?> parameters() {
@@ -94,6 +105,8 @@ public static Collection<?> parameters() {
94105

95106
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
96107
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
108+
.setWalSegmentSize((int)(2 * MB))
109+
.setMaxWalArchiveSize(archiveSz)
97110
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
98111
.setPersistenceEnabled(persistenceEnabled)
99112
.setCdcEnabled(cdcEnabled)));
@@ -196,6 +209,60 @@ public void testWalDisabledIfPersistenceAndCdcDisabled() throws Exception {
196209
assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr"));
197210
}
198211

212+
/** */
213+
@Test
214+
public void testArchiveCleared() throws Exception {
215+
persistenceEnabled = false;
216+
cdcEnabled = true;
217+
archiveSz = 10 * MB;
218+
219+
IgniteEx ignite = startGrid(0);
220+
221+
ignite.cluster().state(ClusterState.ACTIVE);
222+
223+
IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(
224+
new CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
225+
.setCacheMode(mode)
226+
.setAtomicityMode(atomicityMode));
227+
228+
IntConsumer createData = (entryCnt) -> {
229+
for (int i = 0; i < entryCnt; i++) {
230+
byte[] payload = new byte[(int)KB];
231+
232+
ThreadLocalRandom.current().nextBytes(payload);
233+
234+
cache.put(i, payload);
235+
}
236+
};
237+
238+
IgniteWriteAheadLogManager wal = ignite.context().cache().context().wal(true);
239+
240+
long startSgmnt = wal.currentSegment();
241+
242+
createData.accept((int)(archiveSz / (2 * KB)));
243+
244+
long finishSgmnt = wal.currentSegment();
245+
246+
String archive = archive(ignite);
247+
248+
assertTrue(finishSgmnt > startSgmnt);
249+
assertTrue(
250+
"Wait for start segment archivation",
251+
waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout())
252+
);
253+
254+
File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt));
255+
256+
assertTrue("Check archived segment file exists", startSgmntArchived.exists());
257+
258+
createData.accept((int)(archiveSz / KB));
259+
260+
assertTrue(
261+
"Wait for archived segment cleaned",
262+
waitForCondition(() -> !startSgmntArchived.exists(), getTestTimeout())
263+
);
264+
}
265+
199266
/** */
200267
private void doTestWal(
201268
IgniteEx ignite,
@@ -223,16 +290,9 @@ private void doTestWal(
223290

224291
/** */
225292
private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
226-
String archive = U.resolveWorkDirectory(
227-
U.defaultWorkDirectory(),
228-
ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
229-
U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
230-
false
231-
).getAbsolutePath();
232-
233293
WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder()
234294
.ioFactory(new RandomAccessFileIOFactory())
235-
.filesOrDirs(archive));
295+
.filesOrDirs(archive(ignite)));
236296

237297
int walRecCnt = 0;
238298

@@ -254,4 +314,18 @@ private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
254314

255315
return walRecCnt;
256316
}
317+
318+
/**
319+
* @param ignite Ignite.
320+
* @return WAL archive patch
321+
* @throws IgniteCheckedException If failed
322+
*/
323+
private static String archive(IgniteEx ignite) throws IgniteCheckedException {
324+
return U.resolveWorkDirectory(
325+
U.defaultWorkDirectory(),
326+
ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
327+
U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
328+
false
329+
).getAbsolutePath();
330+
}
257331
}

0 commit comments

Comments
 (0)