Skip to content

Commit e580b7c

Browse files
committed
Merge branch 'develop' of github.com:data-integrations/database-delta-plugins into develop
2 parents 82d80fe + 5b4458f commit e580b7c

3 files changed

Lines changed: 45 additions & 27 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,21 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
3737
private static final String KEY = "history";
3838
// Hacky, fix when usage of EmbeddedEngine is replaced
3939
public static DeltaRuntimeContext deltaRuntimeContext;
40-
private final DocumentWriter writer = DocumentWriter.defaultWriter();
41-
private final DocumentReader reader = DocumentReader.defaultReader();
40+
private static final DocumentWriter writer = DocumentWriter.defaultWriter();
41+
private static final DocumentReader reader = DocumentReader.defaultReader();
4242

43+
/**
44+
* Restart the DB schema history from certain {@link HistoryRecord HistoryRecord} specified by the parameter
45+
* @param index the position of the {@link HistoryRecord HistoryRecord} to restart from in the history list. A
46+
* negative value will wipe out all the history
47+
*/
48+
public static void restartFrom(int index) throws IOException {
49+
if (index < 0) {
50+
wipeHistory();
51+
return;
52+
}
53+
storeHistory(getHistory().subList(0, index + 1));
54+
}
4355
public static void wipeHistory() throws IOException {
4456
deltaRuntimeContext.putState(KEY, new byte[] { });
4557
}
@@ -48,6 +60,10 @@ public static void wipeHistory() throws IOException {
4860
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
4961
List<HistoryRecord> history = getHistory();
5062
history.add(record);
63+
storeHistory(history);
64+
}
65+
66+
private static void storeHistory(List<HistoryRecord> history) {
5167
String historyStr = history.stream().map(r -> {
5268
try {
5369
return writer.write(r.document());
@@ -84,7 +100,7 @@ public boolean storageExists() {
84100
}
85101

86102
// TODO: cache history, should only have to read once
87-
private List<HistoryRecord> getHistory() {
103+
private static List<HistoryRecord> getHistory() {
88104
List<HistoryRecord> history = new ArrayList<>();
89105
try {
90106
byte[] historyBytes = deltaRuntimeContext.getState(KEY);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.cdap.delta.mysql;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20-
import io.cdap.delta.api.DeltaFailureException;
2120
import io.cdap.delta.api.DeltaSourceContext;
2221
import io.cdap.delta.api.EventEmitter;
2322
import io.cdap.delta.api.EventReader;
@@ -39,6 +38,7 @@
3938
import io.debezium.jdbc.TemporalPrecisionMode;
4039
import io.debezium.relational.Tables;
4140
import io.debezium.relational.ddl.DdlParser;
41+
import io.debezium.relational.history.HistoryRecord;
4242
import org.slf4j.Logger;
4343
import org.slf4j.LoggerFactory;
4444

@@ -59,6 +59,8 @@
5959
*/
6060
public class MySqlEventReader implements EventReader {
6161
public static final Logger LOG = LoggerFactory.getLogger(MySqlEventReader.class);
62+
63+
static final String SCHEMA_HISTORY_INDEX = "schema.history";
6264
private final MySqlConfig config;
6365
private final EventEmitter emitter;
6466
private final ExecutorService executorService;
@@ -124,19 +126,16 @@ public void start(Offset offset) {
124126
Configuration debeziumConf = configBuilder.build();
125127
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
126128
DBSchemaHistory.deltaRuntimeContext = context;
127-
/*
128-
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
129-
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
130-
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
131-
happen during the initial snapshot.
132-
TODO: (CDAP-16735) fix this more comprehensively
129+
/**
130+
* DBSchemaHistory stores the historical DDL changes. Each time Debezium detects a DDL change in binlog, it will
131+
* store a {@link HistoryRecord History Record} to DBSchemaHistory, in case of a failure or on purpose stop,
132+
* It knows where to restart from.
133133
*/
134-
if (offset.get().isEmpty()) {
135-
try {
136-
DBSchemaHistory.wipeHistory();
137-
} catch (IOException e) {
138-
throw new RuntimeException("Unable to wipe schema history at start of replication.", e);
139-
}
134+
int schemaHistoryIndex = Integer.parseInt(state.getOrDefault(SCHEMA_HISTORY_INDEX, "-1"));
135+
try {
136+
DBSchemaHistory.restartFrom(schemaHistoryIndex);
137+
} catch (IOException e) {
138+
throw new RuntimeException("Unable to reset the schema history at start of replication.", e);
140139
}
141140

142141
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
@@ -149,7 +148,7 @@ public void start(Offset offset) {
149148
engine = EmbeddedEngine.create()
150149
.using(debeziumConf)
151150
.notifying(new MySqlRecordConsumer(context, emitter, ddlParser, mySqlValueConverters,
152-
new Tables(), sourceTableMap))
151+
new Tables(), sourceTableMap, schemaHistoryIndex))
153152
.using(new NotifyingCompletionCallback(context))
154153
.build();
155154
executorService.submit(engine);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,18 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5656
private final MySqlValueConverters mySqlValueConverters;
5757
private final Tables tables;
5858
private final Map<String, SourceTable> sourceTableMap;
59+
private int schemaHistoryIndex;
5960

6061
public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6162
DdlParser ddlParser, MySqlValueConverters mySqlValueConverters,
62-
Tables tables, Map<String, SourceTable> sourceTableMap) {
63+
Tables tables, Map<String, SourceTable> sourceTableMap, int schemaHistoryIndex) {
6364
this.context = context;
6465
this.emitter = emitter;
6566
this.ddlParser = ddlParser;
6667
this.mySqlValueConverters = mySqlValueConverters;
6768
this.tables = tables;
6869
this.sourceTableMap = sourceTableMap;
70+
this.schemaHistoryIndex = schemaHistoryIndex;
6971
}
7072

7173
@Override
@@ -116,11 +118,8 @@ public void accept(SourceRecord sourceRecord) {
116118
return;
117119
}
118120

119-
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
120-
Offset recordOffset = new Offset(deltaOffset);
121121

122122
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
123-
String ddl = val.get("ddl");
124123
StructuredRecord source = val.get("source");
125124
if (source == null) {
126125
// This should not happen, 'source' is a mandatory field in sourceRecord from debezium
@@ -130,9 +129,11 @@ public void accept(SourceRecord sourceRecord) {
130129
// If the map is empty, we should read all DDL/DML events and columns of all tables
131130
boolean readAllTables = sourceTableMap.isEmpty();
132131

132+
String ddl = val.get("ddl");
133+
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
133134
try {
134135
if (ddl != null) {
135-
handleDDL(ddl, recordOffset, isSnapshot, readAllTables);
136+
handleDDL(ddl, deltaOffset, isSnapshot, readAllTables);
136137
return;
137138
}
138139

@@ -143,15 +144,16 @@ public void accept(SourceRecord sourceRecord) {
143144
return;
144145
}
145146

146-
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
147+
handleDML(source, val, deltaOffset, isSnapshot, readAllTables);
147148
} catch (InterruptedException e) {
148149
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
149150
throw new StopConnectorException("Interrupted while emitting event.");
150151
}
151152
}
152153

153-
private void handleDML(StructuredRecord source, StructuredRecord val, Offset recordOffset,
154+
private void handleDML(StructuredRecord source, StructuredRecord val, Map<String, String> deltaOffset,
154155
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
156+
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(schemaHistoryIndex));
155157
String databaseName = source.get("db");
156158
String tableName = source.get("table");
157159
SourceTable sourceTable = getSourceTable(databaseName, tableName);
@@ -198,7 +200,7 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
198200

199201
Long ingestTime = val.get("ts_ms");
200202
DMLEvent.Builder builder = DMLEvent.builder()
201-
.setOffset(recordOffset)
203+
.setOffset(new Offset(deltaOffset))
202204
.setOperationType(op)
203205
.setDatabaseName(databaseName)
204206
.setTableName(tableName)
@@ -216,8 +218,9 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
216218
}
217219
}
218220

219-
private void handleDDL(String ddlStatement, Offset recordOffset,
221+
private void handleDDL(String ddlStatement, Map<String, String> deltaOffset,
220222
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
223+
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(++schemaHistoryIndex));
221224
ddlParser.getDdlChanges().reset();
222225
ddlParser.parse(ddlStatement, tables);
223226
AtomicReference<InterruptedException> interrupted = new AtomicReference<>();
@@ -227,7 +230,7 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
227230
}
228231
for (DdlParserListener.Event event : events) {
229232
DDLEvent.Builder builder = DDLEvent.builder()
230-
.setOffset(recordOffset)
233+
.setOffset(new Offset(deltaOffset))
231234
.setDatabaseName(databaseName)
232235
.setSnapshot(isSnapshot);
233236
DDLEvent ddlEvent = null;

0 commit comments

Comments
 (0)