Skip to content

Commit 3194a52

Browse files
Merge pull request #127 from data-integrations/fix/schema-history
Fix Schema History
2 parents 656d901 + 6f3d429 commit 3194a52

4 files changed

Lines changed: 48 additions & 54 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import io.cdap.cdap.api.common.Bytes;
2020
import io.cdap.delta.api.DeltaRuntimeContext;
21+
import io.debezium.connector.AbstractSourceInfo;
2122
import io.debezium.document.DocumentReader;
2223
import io.debezium.document.DocumentWriter;
2324
import io.debezium.relational.history.AbstractDatabaseHistory;
2425
import io.debezium.relational.history.DatabaseHistoryException;
2526
import io.debezium.relational.history.HistoryRecord;
27+
import io.debezium.relational.history.HistoryRecordComparator;
2628

2729
import java.io.IOException;
2830
import java.util.ArrayList;
@@ -37,33 +39,30 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
3739
private static final String KEY = "history";
3840
// Hacky, fix when usage of EmbeddedEngine is replaced
3941
public static DeltaRuntimeContext deltaRuntimeContext;
40-
private static final DocumentWriter writer = DocumentWriter.defaultWriter();
41-
private static final DocumentReader reader = DocumentReader.defaultReader();
42+
private final DocumentWriter writer = DocumentWriter.defaultWriter();
43+
private final DocumentReader reader = DocumentReader.defaultReader();
4244

43-
/**
44-
* Restart the DB schema history from certain {@link HistoryRecord HistoryRecord} specified by the parameter
45-
* @param index the position of the {@link HistoryRecord HistoryRecord} to restart from in the history list. A
46-
* negative value will wipe out all the history
47-
*/
48-
public static void restartFrom(int index) throws IOException {
49-
if (index < 0) {
50-
wipeHistory();
51-
return;
52-
}
53-
storeHistory(getHistory().subList(0, index + 1));
54-
}
5545
public static void wipeHistory() throws IOException {
5646
deltaRuntimeContext.putState(KEY, new byte[] { });
5747
}
5848

5949
@Override
6050
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
6151
List<HistoryRecord> history = getHistory();
52+
//ignore the history record already seen
53+
//we serialize the history record once DDL event is seen in the source
54+
//however offset is committed once event is applied in the target
55+
//so it's possible that debezium is resuming from a point that is earlier than the last
56+
//serialized history record. And when recover the history record, debezium will ignore those
57+
//history record that is later than the resuming point.
58+
//Thus it's possible for Debezium to store some history record that is already serialized.
59+
//And all snapshot history record will have same position
60+
if (Boolean.TRUE != record.document().getDocument(HistoryRecord.Fields.POSITION)
61+
.getBoolean(AbstractSourceInfo.SNAPSHOT_KEY) && !history.isEmpty() &&
62+
HistoryRecordComparator.INSTANCE.isAtOrBefore(record, history.get(history.size() - 1))) {
63+
return;
64+
}
6265
history.add(record);
63-
storeHistory(history);
64-
}
65-
66-
private static void storeHistory(List<HistoryRecord> history) {
6766
String historyStr = history.stream().map(r -> {
6867
try {
6968
return writer.write(r.document());
@@ -100,7 +99,7 @@ public boolean storageExists() {
10099
}
101100

102101
// TODO: cache history, should only have to read once
103-
private static List<HistoryRecord> getHistory() {
102+
private List<HistoryRecord> getHistory() {
104103
List<HistoryRecord> history = new ArrayList<>();
105104
try {
106105
byte[] historyBytes = deltaRuntimeContext.getState(KEY);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import io.debezium.jdbc.TemporalPrecisionMode;
3939
import io.debezium.relational.Tables;
4040
import io.debezium.relational.ddl.DdlParser;
41-
import io.debezium.relational.history.HistoryRecord;
4241
import org.slf4j.Logger;
4342
import org.slf4j.LoggerFactory;
4443

@@ -59,8 +58,6 @@
5958
*/
6059
public class MySqlEventReader implements EventReader {
6160
public static final Logger LOG = LoggerFactory.getLogger(MySqlEventReader.class);
62-
63-
static final String SCHEMA_HISTORY_INDEX = "schema.history";
6461
private final MySqlConfig config;
6562
private final EventEmitter emitter;
6663
private final ExecutorService executorService;
@@ -95,14 +92,15 @@ public void start(Offset offset) {
9592
Map<String, SourceTable> sourceTableMap = sourceTables.stream().collect(
9693
Collectors.toMap(t -> config.getDatabase() + "." + t.getTable(), t -> t));
9794
Map<String, String> state = offset.get(); // state map is always not null
95+
String isSnapshot = state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, "");
9896
Configuration.Builder configBuilder = Configuration.create()
9997
.with("connector.class", MySqlConnector.class.getName())
10098
.with("offset.storage", MySqlConstantOffsetBackingStore.class.getName())
10199
.with("offset.flush.interval.ms", 1000)
102100
/* bind offset configs with debeizumConf */
103101
.with("file", state.getOrDefault(MySqlConstantOffsetBackingStore.FILE, ""))
104102
.with("pos", state.getOrDefault(MySqlConstantOffsetBackingStore.POS, ""))
105-
.with("snapshot", state.getOrDefault(MySqlConstantOffsetBackingStore.SNAPSHOT, ""))
103+
.with("snapshot", isSnapshot)
106104
.with("row", state.getOrDefault(MySqlConstantOffsetBackingStore.ROW, ""))
107105
.with("event", state.getOrDefault(MySqlConstantOffsetBackingStore.EVENT, ""))
108106
.with("gtids", state.getOrDefault(MySqlConstantOffsetBackingStore.GTID_SET, ""))
@@ -126,16 +124,17 @@ public void start(Offset offset) {
126124
Configuration debeziumConf = configBuilder.build();
127125
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
128126
DBSchemaHistory.deltaRuntimeContext = context;
129-
/**
130-
* DBSchemaHistory stores the historical DDL changes. Each time Debezium detects a DDL change in binlog, it will
131-
* store a {@link HistoryRecord History Record} to DBSchemaHistory, in case of a failure or on purpose stop,
132-
* It knows where to restart from.
127+
/*
128+
* All snapshot events or schema history record have same position/offset
129+
* if replicator was stopped or paused from middle of snapshot, it
130+
* will resume from beginning.
133131
*/
134-
int schemaHistoryIndex = Integer.parseInt(state.getOrDefault(SCHEMA_HISTORY_INDEX, "-1"));
135-
try {
136-
DBSchemaHistory.restartFrom(schemaHistoryIndex);
137-
} catch (IOException e) {
138-
throw new RuntimeException("Unable to reset the schema history at start of replication.", e);
132+
if (offset.get().isEmpty() || "true".equalsIgnoreCase(isSnapshot)) {
133+
try {
134+
DBSchemaHistory.wipeHistory();
135+
} catch (IOException e) {
136+
throw new RuntimeException("Unable to wipe schema history at start of replication.", e);
137+
}
139138
}
140139

141140
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
@@ -148,7 +147,7 @@ public void start(Offset offset) {
148147
engine = EmbeddedEngine.create()
149148
.using(debeziumConf)
150149
.notifying(new MySqlRecordConsumer(context, emitter, ddlParser, mySqlValueConverters,
151-
new Tables(), sourceTableMap, schemaHistoryIndex))
150+
new Tables(), sourceTableMap))
152151
.using(new NotifyingCompletionCallback(context))
153152
.build();
154153
executorService.submit(engine);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,16 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5656
private final MySqlValueConverters mySqlValueConverters;
5757
private final Tables tables;
5858
private final Map<String, SourceTable> sourceTableMap;
59-
private int schemaHistoryIndex;
6059

6160
public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6261
DdlParser ddlParser, MySqlValueConverters mySqlValueConverters,
63-
Tables tables, Map<String, SourceTable> sourceTableMap, int schemaHistoryIndex) {
62+
Tables tables, Map<String, SourceTable> sourceTableMap) {
6463
this.context = context;
6564
this.emitter = emitter;
6665
this.ddlParser = ddlParser;
6766
this.mySqlValueConverters = mySqlValueConverters;
6867
this.tables = tables;
6968
this.sourceTableMap = sourceTableMap;
70-
this.schemaHistoryIndex = schemaHistoryIndex;
7169
}
7270

7371
@Override
@@ -118,22 +116,23 @@ public void accept(SourceRecord sourceRecord) {
118116
return;
119117
}
120118

119+
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
120+
Offset recordOffset = new Offset(deltaOffset);
121121

122122
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
123+
String ddl = val.get("ddl");
123124
StructuredRecord source = val.get("source");
124125
if (source == null) {
125126
// This should not happen, 'source' is a mandatory field in sourceRecord from debezium
126127
return;
127128
}
128-
boolean isSnapshot = Boolean.TRUE.equals(source.get(MySqlConstantOffsetBackingStore.SNAPSHOT));
129+
boolean isSnapshot = Boolean.TRUE.equals(deltaOffset.get(MySqlConstantOffsetBackingStore.SNAPSHOT));
129130
// If the map is empty, we should read all DDL/DML events and columns of all tables
130131
boolean readAllTables = sourceTableMap.isEmpty();
131132

132-
String ddl = val.get("ddl");
133-
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
134133
try {
135134
if (ddl != null) {
136-
handleDDL(ddl, deltaOffset, isSnapshot, readAllTables);
135+
handleDDL(ddl, recordOffset, isSnapshot, readAllTables);
137136
return;
138137
}
139138

@@ -144,16 +143,15 @@ public void accept(SourceRecord sourceRecord) {
144143
return;
145144
}
146145

147-
handleDML(source, val, deltaOffset, isSnapshot, readAllTables);
146+
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
148147
} catch (InterruptedException e) {
149148
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
150149
throw new StopConnectorException("Interrupted while emitting event.");
151150
}
152151
}
153152

154-
private void handleDML(StructuredRecord source, StructuredRecord val, Map<String, String> deltaOffset,
153+
private void handleDML(StructuredRecord source, StructuredRecord val, Offset recordOffset,
155154
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
156-
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(schemaHistoryIndex));
157155
String databaseName = source.get("db");
158156
String tableName = source.get("table");
159157
SourceTable sourceTable = getSourceTable(databaseName, tableName);
@@ -200,7 +198,7 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Map<String
200198

201199
Long ingestTime = val.get("ts_ms");
202200
DMLEvent.Builder builder = DMLEvent.builder()
203-
.setOffset(new Offset(deltaOffset))
201+
.setOffset(recordOffset)
204202
.setOperationType(op)
205203
.setDatabaseName(databaseName)
206204
.setTableName(tableName)
@@ -218,9 +216,8 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Map<String
218216
}
219217
}
220218

221-
private void handleDDL(String ddlStatement, Map<String, String> deltaOffset,
219+
private void handleDDL(String ddlStatement, Offset recordOffset,
222220
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
223-
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(++schemaHistoryIndex));
224221
ddlParser.getDdlChanges().reset();
225222
ddlParser.parse(ddlStatement, tables);
226223
AtomicReference<InterruptedException> interrupted = new AtomicReference<>();
@@ -230,7 +227,7 @@ private void handleDDL(String ddlStatement, Map<String, String> deltaOffset,
230227
}
231228
for (DdlParserListener.Event event : events) {
232229
DDLEvent.Builder builder = DDLEvent.builder()
233-
.setOffset(new Offset(deltaOffset))
230+
.setOffset(recordOffset)
234231
.setDatabaseName(databaseName)
235232
.setSnapshot(isSnapshot);
236233
DDLEvent ddlEvent = null;

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public void start(Offset offset) {
9898

9999
Map<String, String> state = offset.get(); // this will never be null
100100
// offset config
101+
String isSnapshotCompleted = state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, "");
101102
Configuration debeziumConf = Configuration.create()
102103
.with("connector.class", SqlServerConnector.class.getName())
103104
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
@@ -106,7 +107,7 @@ public void start(Offset offset) {
106107
.with("change_lsn", state.getOrDefault(SourceInfo.CHANGE_LSN_KEY, ""))
107108
.with("commit_lsn", state.getOrDefault(SourceInfo.COMMIT_LSN_KEY, ""))
108109
.with("snapshot", state.getOrDefault(SourceInfo.SNAPSHOT_KEY, ""))
109-
.with("snapshot_completed", state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, ""))
110+
.with("snapshot_completed", isSnapshotCompleted)
110111
/* begin connector properties */
111112
.with("name", "delta")
112113
.with("database.hostname", config.getHost())
@@ -127,13 +128,11 @@ public void start(Offset offset) {
127128
new HashSet<>(Arrays.asList(snapshotTablesStr.split(SqlServerOffset.DELIMITER)));
128129

129130
/*
130-
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
131-
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
132-
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
133-
happen during the initial snapshot.
134-
TODO: (CDAP-16735) fix this more comprehensively
131+
* All snapshot events or schema history record have same position/offset
132+
* if replicator was stopped or paused from middle of snapshot, it
133+
* will resume from beginning.
135134
*/
136-
if (offset.get().isEmpty()) {
135+
if (offset.get().isEmpty() || !"true".equalsIgnoreCase(isSnapshotCompleted)) {
137136
try {
138137
DBSchemaHistory.wipeHistory();
139138
} catch (IOException e) {

0 commit comments

Comments
 (0)