Skip to content

Commit 6f3d429

Browse files
not store duplicate history schema record
clear schema history if replicator was stopped in middle of snapshot
1 parent 72c563c commit 6f3d429

4 files changed

Lines changed: 28 additions & 16 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import io.cdap.cdap.api.common.Bytes;
2020
import io.cdap.delta.api.DeltaRuntimeContext;
21+
import io.debezium.connector.AbstractSourceInfo;
2122
import io.debezium.document.DocumentReader;
2223
import io.debezium.document.DocumentWriter;
2324
import io.debezium.relational.history.AbstractDatabaseHistory;
2425
import io.debezium.relational.history.DatabaseHistoryException;
2526
import io.debezium.relational.history.HistoryRecord;
27+
import io.debezium.relational.history.HistoryRecordComparator;
2628

2729
import java.io.IOException;
2830
import java.util.ArrayList;
@@ -47,6 +49,19 @@ public static void wipeHistory() throws IOException {
4749
@Override
4850
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
4951
List<HistoryRecord> history = getHistory();
52+
//ignore the history record already seen
53+
//we serialize the history record once DDL event is seen in the source
54+
//however offset is committed once event is applied in the target
55+
//so it's possible that debezium is resuming from a point that is earlier than the last
56+
//serialized history record. And when recover the history record, debezium will ignore those
57+
//history record that is later than the resuming point.
58+
//Thus it's possible for Debezium to store some history record that is already serialized.
59+
//And all snapshot history record will have same position
60+
if (Boolean.TRUE != record.document().getDocument(HistoryRecord.Fields.POSITION)
61+
.getBoolean(AbstractSourceInfo.SNAPSHOT_KEY) && !history.isEmpty() &&
62+
HistoryRecordComparator.INSTANCE.isAtOrBefore(record, history.get(history.size() - 1))) {
63+
return;
64+
}
5065
history.add(record);
5166
String historyStr = history.stream().map(r -> {
5267
try {

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.cdap.delta.mysql;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20-
import io.cdap.delta.api.DeltaFailureException;
2120
import io.cdap.delta.api.DeltaSourceContext;
2221
import io.cdap.delta.api.EventEmitter;
2322
import io.cdap.delta.api.EventReader;
@@ -93,14 +92,15 @@ public void start(Offset offset) {
9392
Map<String, SourceTable> sourceTableMap = sourceTables.stream().collect(
9493
Collectors.toMap(t -> config.getDatabase() + "." + t.getTable(), t -> t));
9594
Map<String, String> state = offset.get(); // state map is always not null
95+
String isSnapshot = state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, "");
9696
Configuration.Builder configBuilder = Configuration.create()
9797
.with("connector.class", MySqlConnector.class.getName())
9898
.with("offset.storage", MySqlConstantOffsetBackingStore.class.getName())
9999
.with("offset.flush.interval.ms", 1000)
100100
/* bind offset configs with debeizumConf */
101101
.with("file", state.getOrDefault(MySqlConstantOffsetBackingStore.FILE, ""))
102102
.with("pos", state.getOrDefault(MySqlConstantOffsetBackingStore.POS, ""))
103-
.with("snapshot", state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, ""))
103+
.with("snapshot", isSnapshot)
104104
.with("row", state.getOrDefault(MySqlConstantOffsetBackingStore.ROW, ""))
105105
.with("event", state.getOrDefault(MySqlConstantOffsetBackingStore.EVENT, ""))
106106
.with("gtids", state.getOrDefault(MySqlConstantOffsetBackingStore.GTID_SET, ""))
@@ -125,13 +125,11 @@ public void start(Offset offset) {
125125
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
126126
DBSchemaHistory.deltaRuntimeContext = context;
127127
/*
128-
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
129-
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
130-
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
131-
happen during the initial snapshot.
132-
TODO: (CDAP-16735) fix this more comprehensively
128+
* All snapshot events or schema history record have same position/offset
129+
* if replicator was stopped or paused from middle of snapshot, it
130+
* will resume from beginning.
133131
*/
134-
if (offset.get().isEmpty()) {
132+
if (offset.get().isEmpty() || "true".equalsIgnoreCase(isSnapshot)) {
135133
try {
136134
DBSchemaHistory.wipeHistory();
137135
} catch (IOException e) {

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void accept(SourceRecord sourceRecord) {
126126
// This should not happen, 'source' is a mandatory field in sourceRecord from debezium
127127
return;
128128
}
129-
boolean isSnapshot = Boolean.TRUE.equals(source.get(MySqlConstantOffsetBackingStore.SNAPSHOT));
129+
boolean isSnapshot = Boolean.TRUE.equals(deltaOffset.get(MySqlConstantOffsetBackingStore.SNAPSHOT));
130130
// If the map is empty, we should read all DDL/DML events and columns of all tables
131131
boolean readAllTables = sourceTableMap.isEmpty();
132132

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public void start(Offset offset) {
9898

9999
Map<String, String> state = offset.get(); // this will never be null
100100
// offset config
101+
String isSnapshotCompleted = state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, "");
101102
Configuration debeziumConf = Configuration.create()
102103
.with("connector.class", SqlServerConnector.class.getName())
103104
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
@@ -106,7 +107,7 @@ public void start(Offset offset) {
106107
.with("change_lsn", state.getOrDefault(SourceInfo.CHANGE_LSN_KEY, ""))
107108
.with("commit_lsn", state.getOrDefault(SourceInfo.COMMIT_LSN_KEY, ""))
108109
.with("snapshot", state.getOrDefault(SourceInfo.SNAPSHOT_KEY, ""))
109-
.with("snapshot_completed", state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, ""))
110+
.with("snapshot_completed", isSnapshotCompleted)
110111
/* begin connector properties */
111112
.with("name", "delta")
112113
.with("database.hostname", config.getHost())
@@ -127,13 +128,11 @@ public void start(Offset offset) {
127128
new HashSet<>(Arrays.asList(snapshotTablesStr.split(SqlServerOffset.DELIMITER)));
128129

129130
/*
130-
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
131-
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
132-
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
133-
happen during the initial snapshot.
134-
TODO: (CDAP-16735) fix this more comprehensively
131+
* All snapshot events or schema history record have same position/offset
132+
* if replicator was stopped or paused from middle of snapshot, it
133+
* will resume from beginning.
135134
*/
136-
if (offset.get().isEmpty()) {
135+
if (offset.get().isEmpty() || !"true".equalsIgnoreCase(isSnapshotCompleted)) {
137136
try {
138137
DBSchemaHistory.wipeHistory();
139138
} catch (IOException e) {

0 commit comments

Comments
 (0)