Skip to content

Commit 72c563c

Browse files
Revert "CDAP-16735 fix DBSchemaHistory"
This reverts commit 5b4458f.
1 parent 5b4458f commit 72c563c

3 files changed

Lines changed: 27 additions & 45 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,9 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
3737
private static final String KEY = "history";
3838
// Hacky, fix when usage of EmbeddedEngine is replaced
3939
public static DeltaRuntimeContext deltaRuntimeContext;
40-
private static final DocumentWriter writer = DocumentWriter.defaultWriter();
41-
private static final DocumentReader reader = DocumentReader.defaultReader();
40+
private final DocumentWriter writer = DocumentWriter.defaultWriter();
41+
private final DocumentReader reader = DocumentReader.defaultReader();
4242

43-
/**
44-
* Restart the DB schema history from certain {@link HistoryRecord HistoryRecord} specified by the parameter
45-
* @param index the position of the {@link HistoryRecord HistoryRecord} to restart from in the history list. A
46-
* negative value will wipe out all the history
47-
*/
48-
public static void restartFrom(int index) throws IOException {
49-
if (index < 0) {
50-
wipeHistory();
51-
return;
52-
}
53-
storeHistory(getHistory().subList(0, index + 1));
54-
}
5543
public static void wipeHistory() throws IOException {
5644
deltaRuntimeContext.putState(KEY, new byte[] { });
5745
}
@@ -60,10 +48,6 @@ public static void wipeHistory() throws IOException {
6048
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
6149
List<HistoryRecord> history = getHistory();
6250
history.add(record);
63-
storeHistory(history);
64-
}
65-
66-
private static void storeHistory(List<HistoryRecord> history) {
6751
String historyStr = history.stream().map(r -> {
6852
try {
6953
return writer.write(r.document());
@@ -100,7 +84,7 @@ public boolean storageExists() {
10084
}
10185

10286
// TODO: cache history, should only have to read once
103-
private static List<HistoryRecord> getHistory() {
87+
private List<HistoryRecord> getHistory() {
10488
List<HistoryRecord> history = new ArrayList<>();
10589
try {
10690
byte[] historyBytes = deltaRuntimeContext.getState(KEY);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.delta.mysql;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import io.cdap.delta.api.DeltaFailureException;
2021
import io.cdap.delta.api.DeltaSourceContext;
2122
import io.cdap.delta.api.EventEmitter;
2223
import io.cdap.delta.api.EventReader;
@@ -38,7 +39,6 @@
3839
import io.debezium.jdbc.TemporalPrecisionMode;
3940
import io.debezium.relational.Tables;
4041
import io.debezium.relational.ddl.DdlParser;
41-
import io.debezium.relational.history.HistoryRecord;
4242
import org.slf4j.Logger;
4343
import org.slf4j.LoggerFactory;
4444

@@ -59,8 +59,6 @@
5959
*/
6060
public class MySqlEventReader implements EventReader {
6161
public static final Logger LOG = LoggerFactory.getLogger(MySqlEventReader.class);
62-
63-
static final String SCHEMA_HISTORY_INDEX = "schema.history";
6462
private final MySqlConfig config;
6563
private final EventEmitter emitter;
6664
private final ExecutorService executorService;
@@ -126,16 +124,19 @@ public void start(Offset offset) {
126124
Configuration debeziumConf = configBuilder.build();
127125
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
128126
DBSchemaHistory.deltaRuntimeContext = context;
129-
/**
130-
* DBSchemaHistory stores the historical DDL changes. Each time Debezium detects a DDL change in binlog, it will
131-
* store a {@link HistoryRecord History Record} to DBSchemaHistory, in case of a failure or on purpose stop,
132-
* It knows where to restart from.
127+
/*
128+
this is required in scenarios where the source is able to emit the starting DDL events during snapshotting,
129+
but the target is unable to apply them. In that case, this reader will be created again, but it won't re-emit
130+
those DDL events unless the DB history is wiped. This only fixes handling of DDL errors that
131+
happen during the initial snapshot.
132+
TODO: (CDAP-16735) fix this more comprehensively
133133
*/
134-
int schemaHistoryIndex = Integer.parseInt(state.getOrDefault(SCHEMA_HISTORY_INDEX, "-1"));
135-
try {
136-
DBSchemaHistory.restartFrom(schemaHistoryIndex);
137-
} catch (IOException e) {
138-
throw new RuntimeException("Unable to reset the schema history at start of replication.", e);
134+
if (offset.get().isEmpty()) {
135+
try {
136+
DBSchemaHistory.wipeHistory();
137+
} catch (IOException e) {
138+
throw new RuntimeException("Unable to wipe schema history at start of replication.", e);
139+
}
139140
}
140141

141142
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
@@ -148,7 +149,7 @@ public void start(Offset offset) {
148149
engine = EmbeddedEngine.create()
149150
.using(debeziumConf)
150151
.notifying(new MySqlRecordConsumer(context, emitter, ddlParser, mySqlValueConverters,
151-
new Tables(), sourceTableMap, schemaHistoryIndex))
152+
new Tables(), sourceTableMap))
152153
.using(new NotifyingCompletionCallback(context))
153154
.build();
154155
executorService.submit(engine);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,16 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5656
private final MySqlValueConverters mySqlValueConverters;
5757
private final Tables tables;
5858
private final Map<String, SourceTable> sourceTableMap;
59-
private int schemaHistoryIndex;
6059

6160
public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6261
DdlParser ddlParser, MySqlValueConverters mySqlValueConverters,
63-
Tables tables, Map<String, SourceTable> sourceTableMap, int schemaHistoryIndex) {
62+
Tables tables, Map<String, SourceTable> sourceTableMap) {
6463
this.context = context;
6564
this.emitter = emitter;
6665
this.ddlParser = ddlParser;
6766
this.mySqlValueConverters = mySqlValueConverters;
6867
this.tables = tables;
6968
this.sourceTableMap = sourceTableMap;
70-
this.schemaHistoryIndex = schemaHistoryIndex;
7169
}
7270

7371
@Override
@@ -118,8 +116,11 @@ public void accept(SourceRecord sourceRecord) {
118116
return;
119117
}
120118

119+
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
120+
Offset recordOffset = new Offset(deltaOffset);
121121

122122
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
123+
String ddl = val.get("ddl");
123124
StructuredRecord source = val.get("source");
124125
if (source == null) {
125126
// This should not happen, 'source' is a mandatory field in sourceRecord from debezium
@@ -129,11 +130,9 @@ public void accept(SourceRecord sourceRecord) {
129130
// If the map is empty, we should read all DDL/DML events and columns of all tables
130131
boolean readAllTables = sourceTableMap.isEmpty();
131132

132-
String ddl = val.get("ddl");
133-
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
134133
try {
135134
if (ddl != null) {
136-
handleDDL(ddl, deltaOffset, isSnapshot, readAllTables);
135+
handleDDL(ddl, recordOffset, isSnapshot, readAllTables);
137136
return;
138137
}
139138

@@ -144,16 +143,15 @@ public void accept(SourceRecord sourceRecord) {
144143
return;
145144
}
146145

147-
handleDML(source, val, deltaOffset, isSnapshot, readAllTables);
146+
handleDML(source, val, recordOffset, isSnapshot, readAllTables);
148147
} catch (InterruptedException e) {
149148
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
150149
throw new StopConnectorException("Interrupted while emitting event.");
151150
}
152151
}
153152

154-
private void handleDML(StructuredRecord source, StructuredRecord val, Map<String, String> deltaOffset,
153+
private void handleDML(StructuredRecord source, StructuredRecord val, Offset recordOffset,
155154
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
156-
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(schemaHistoryIndex));
157155
String databaseName = source.get("db");
158156
String tableName = source.get("table");
159157
SourceTable sourceTable = getSourceTable(databaseName, tableName);
@@ -200,7 +198,7 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Map<String
200198

201199
Long ingestTime = val.get("ts_ms");
202200
DMLEvent.Builder builder = DMLEvent.builder()
203-
.setOffset(new Offset(deltaOffset))
201+
.setOffset(recordOffset)
204202
.setOperationType(op)
205203
.setDatabaseName(databaseName)
206204
.setTableName(tableName)
@@ -218,9 +216,8 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Map<String
218216
}
219217
}
220218

221-
private void handleDDL(String ddlStatement, Map<String, String> deltaOffset,
219+
private void handleDDL(String ddlStatement, Offset recordOffset,
222220
boolean isSnapshot, boolean readAllTables) throws InterruptedException {
223-
deltaOffset.put(MySqlEventReader.SCHEMA_HISTORY_INDEX, String.valueOf(++schemaHistoryIndex));
224221
ddlParser.getDdlChanges().reset();
225222
ddlParser.parse(ddlStatement, tables);
226223
AtomicReference<InterruptedException> interrupted = new AtomicReference<>();
@@ -230,7 +227,7 @@ private void handleDDL(String ddlStatement, Map<String, String> deltaOffset,
230227
}
231228
for (DdlParserListener.Event event : events) {
232229
DDLEvent.Builder builder = DDLEvent.builder()
233-
.setOffset(new Offset(deltaOffset))
230+
.setOffset(recordOffset)
234231
.setDatabaseName(databaseName)
235232
.setSnapshot(isSnapshot);
236233
DDLEvent ddlEvent = null;

0 commit comments

Comments
 (0)