Skip to content

Commit 2328cc9

Browse files
CDAP-17584 clear the snapshotting table list when resume from middle of the snapshotting (#140)
1 parent 7222899 commit 2328cc9

3 files changed

Lines changed: 22 additions & 20 deletions

File tree

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,17 @@ public void start(Offset offset) {
125125

126126
DBSchemaHistory.deltaRuntimeContext = context;
127127

128-
String snapshotTablesStr = state.get(SqlServerOffset.SNAPSHOT_TABLES);
129-
Set<String> snapshotTables = Strings.isNullOrEmpty(snapshotTablesStr) ? new HashSet<>() :
130-
new HashSet<>(Arrays.asList(snapshotTablesStr.split(SqlServerOffset.DELIMITER)));
128+
String ddlEventSentStr = state.get(SqlServerOffset.DDL_EVENT_SENT);
129+
Set<String> ddlEventSent = Strings.isNullOrEmpty(ddlEventSentStr) ? new HashSet<>() :
130+
new HashSet<>(Arrays.asList(ddlEventSentStr.split(SqlServerOffset.DELIMITER)));
131131

132132
/*
133133
* All snapshot events or schema history record have same position/offset
134134
* if replicator was stopped or paused from middle of snapshot, it
135135
* will resume from beginning.
136136
*/
137137
if (offset.get().isEmpty() || !"true".equalsIgnoreCase(isSnapshotCompleted)) {
138+
ddlEventSent.clear();
138139
try {
139140
DBSchemaHistory.wipeHistory();
140141
} catch (IOException e) {
@@ -149,7 +150,7 @@ public void start(Offset offset) {
149150
LOG.info("creating new EmbeddedEngine...");
150151
// Create the engine with this configuration ...
151152
engine = EmbeddedEngine.create()
152-
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, snapshotTables, sourceTableMap, offset))
153+
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, ddlEventSent, sourceTableMap, offset))
153154
.using(debeziumConf)
154155
.using(new NotifyingCompletionCallback(context))
155156
.build();

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerOffset.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,32 @@
3131
*/
3232
public class SqlServerOffset {
3333
static final String DELIMITER = ",";
34-
static final String SNAPSHOT_TABLES = "snapshot_tables";
34+
static final String DDL_EVENT_SENT = "ddl_event_sent";
3535

3636
private final String changeLsn;
3737
private final String commitLsn;
3838
private final Boolean isSnapshot;
3939
private final Boolean isSnapshotCompleted;
40-
private Set<String> snapshotTables;
40+
private Set<String> ddlEventSent;
4141

4242
SqlServerOffset(Map<String, ?> properties) {
4343
this.changeLsn = (String) properties.get(SourceInfo.CHANGE_LSN_KEY);
4444
this.commitLsn = (String) properties.get(SourceInfo.COMMIT_LSN_KEY);
4545
this.isSnapshot = (Boolean) properties.get(SourceInfo.SNAPSHOT_KEY);
4646
this.isSnapshotCompleted = (Boolean) properties.get(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED);
47-
this.snapshotTables = new HashSet<>();
47+
this.ddlEventSent = new HashSet<>();
4848
}
4949

5050
boolean isSnapshot() {
5151
return isSnapshot;
5252
}
5353

54-
void setSnapshotTables(Set<String> snapshotTables) {
55-
this.snapshotTables = new HashSet<>(snapshotTables);
54+
void setDdlEventSent(Set<String> ddlEventSent) {
55+
this.ddlEventSent = new HashSet<>(ddlEventSent);
5656
}
5757

5858
void addSnapshotTable(String table) {
59-
snapshotTables.add(table);
59+
ddlEventSent.add(table);
6060
}
6161

6262
Offset getAsOffset() {
@@ -73,8 +73,8 @@ Offset getAsOffset() {
7373
if (isSnapshotCompleted != null) {
7474
deltaOffset.put(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, String.valueOf(isSnapshotCompleted));
7575
}
76-
if (snapshotTables != null && !snapshotTables.isEmpty()) {
77-
deltaOffset.put(SNAPSHOT_TABLES, String.join(DELIMITER, snapshotTables));
76+
if (ddlEventSent != null && !ddlEventSent.isEmpty()) {
77+
deltaOffset.put(DDL_EVENT_SENT, String.join(DELIMITER, ddlEventSent));
7878
}
7979

8080
return new Offset(deltaOffset);
@@ -106,11 +106,11 @@ public boolean equals(Object o) {
106106
&& Objects.equals(commitLsn, that.commitLsn)
107107
&& Objects.equals(isSnapshot, that.isSnapshot)
108108
&& Objects.equals(isSnapshotCompleted, that.isSnapshotCompleted)
109-
&& Objects.equals(snapshotTables, that.snapshotTables);
109+
&& Objects.equals(ddlEventSent, that.ddlEventSent);
110110
}
111111

112112
@Override
113113
public int hashCode() {
114-
return Objects.hash(changeLsn, commitLsn, isSnapshot, isSnapshotCompleted, snapshotTables);
114+
return Objects.hash(changeLsn, commitLsn, isSnapshot, isSnapshotCompleted, ddlEventSent);
115115
}
116116
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,19 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5151
private final EventEmitter emitter;
5252
// we need this since there is no way to get the db information from the source record
5353
private final String databaseName;
54-
private final Set<String> snapshotTables;
54+
// record tables that already had DDL events sent
55+
private final Set<String> ddlEventSent;
5556
private final Map<String, SourceTable> sourceTableMap;
5657
private Offset latestOffset;
5758

5859

5960
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
60-
Set<String> snapshotTables, Map<String, SourceTable> sourceTableMap,
61+
Set<String> ddlEventSent, Map<String, SourceTable> sourceTableMap,
6162
Offset latestOffset) {
6263
this.context = context;
6364
this.emitter = emitter;
6465
this.databaseName = databaseName;
65-
this.snapshotTables = snapshotTables;
66+
this.ddlEventSent = ddlEventSent;
6667
this.sourceTableMap = sourceTableMap;
6768
this.latestOffset = latestOffset;
6869
}
@@ -89,7 +90,7 @@ public void accept(SourceRecord sourceRecord) {
8990
return;
9091
}
9192

92-
sqlServerOffset.setSnapshotTables(snapshotTables);
93+
sqlServerOffset.setDdlEventSent(ddlEventSent);
9394
boolean isSnapshot = sqlServerOffset.isSnapshot();
9495
latestOffset = sqlServerOffset.getAsOffset();
9596

@@ -149,7 +150,7 @@ public void accept(SourceRecord sourceRecord) {
149150
Schema schema = value.getSchema();
150151
// send the ddl events only if we see the table at the first time
151152
// Note: the delta app itself have prevented adding CREATE_TABLE operation into DDL blacklist for all the tables.
152-
if (!snapshotTables.contains(sourceTableId)) {
153+
if (!ddlEventSent.contains(sourceTableId)) {
153154
StructuredRecord key = Records.convert((Struct) sourceRecord.key());
154155
List<Schema.Field> fields = key.getSchema().getFields();
155156
List<String> primaryFields = new ArrayList<>();
@@ -182,7 +183,7 @@ public void accept(SourceRecord sourceRecord) {
182183
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
183184
throw new StopConnectorException("Interrupted while emitting an event.");
184185
}
185-
snapshotTables.add(sourceTableId);
186+
ddlEventSent.add(sourceTableId);
186187
}
187188

188189
if (!readAllTables && sourceTable.getDmlBlacklist().contains(op)) {

0 commit comments

Comments
 (0)