Skip to content

Commit 798106e

Browse files
committed
Set the record offset (specifically 'snapshot' and 'snapshot_completed' fields) explicitly based on the incoming source record for the SQL server source.
1 parent 35f54be commit 798106e

2 files changed

Lines changed: 26 additions & 29 deletions

File tree

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerOffset.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,28 @@ public class SqlServerOffset {
3737
private final String commitLsn;
3838
private final Boolean isSnapshot;
3939
private final Boolean isSnapshotCompleted;
40-
private Set<String> ddlEventSent;
40+
private final Set<String> ddlEventSent;
4141

42-
SqlServerOffset(Map<String, ?> properties) {
42+
SqlServerOffset(Map<String, ?> properties, Set<String> ddlEventSent) {
4343
this.changeLsn = (String) properties.get(SourceInfo.CHANGE_LSN_KEY);
4444
this.commitLsn = (String) properties.get(SourceInfo.COMMIT_LSN_KEY);
45-
this.isSnapshot = (Boolean) properties.get(SourceInfo.SNAPSHOT_KEY);
46-
this.isSnapshotCompleted = (Boolean) properties.get(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED);
47-
this.ddlEventSent = new HashSet<>();
45+
if (properties.containsKey(SourceInfo.SNAPSHOT_KEY)) {
46+
this.isSnapshot = (Boolean) properties.get(SourceInfo.SNAPSHOT_KEY);
47+
} else {
48+
this.isSnapshot = false;
49+
}
50+
if (properties.containsKey(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED)) {
51+
this.isSnapshotCompleted = (Boolean) properties.get(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED);
52+
} else {
53+
this.isSnapshotCompleted = true;
54+
}
55+
this.ddlEventSent = ddlEventSent;
4856
}
4957

5058
boolean isSnapshot() {
5159
return isSnapshot;
5260
}
5361

54-
void setDdlEventSent(Set<String> ddlEventSent) {
55-
this.ddlEventSent = new HashSet<>(ddlEventSent);
56-
}
57-
58-
void addSnapshotTable(String table) {
59-
ddlEventSent.add(table);
60-
}
61-
6262
Offset getAsOffset() {
6363
Map<String, String> deltaOffset = new HashMap<>();
6464
if (changeLsn != null) {

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5555
private final Set<String> ddlEventSent;
5656
private final Map<String, SourceTable> sourceTableMap;
5757
private final boolean replicateExistingData;
58-
private Offset latestOffset;
58+
private final Offset latestOffset;
5959

6060

6161
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
@@ -81,21 +81,17 @@ public void accept(SourceRecord sourceRecord) {
8181
return;
8282
}
8383

84-
SqlServerOffset sqlServerOffset = new SqlServerOffset(sourceRecord.sourceOffset());
8584
// ignore duplicated CDC event
8685
// SQLServer connector will relay the last event at the offset
8786
// to be safe here we check whether it's before or at the same offset
8887
// snapshotting will resume from beginning, and the whole table that is partly snapshotted
8988
// is supposed to be dropped first , thus no need to consider
89+
SqlServerOffset sqlServerOffset = new SqlServerOffset(sourceRecord.sourceOffset(), ddlEventSent);
9090
if (!sqlServerOffset.isSnapshot() && sqlServerOffset.isBeforeOrAt(latestOffset)) {
9191
LOG.debug("Got duplicated event {} ", sourceRecord);
9292
return;
9393
}
9494

95-
sqlServerOffset.setDdlEventSent(ddlEventSent);
96-
boolean isSnapshot = sqlServerOffset.isSnapshot();
97-
latestOffset = sqlServerOffset.getAsOffset();
98-
9995
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
10096
DMLOperation.Type op;
10197
String opStr = val.get("op");
@@ -145,23 +141,23 @@ public void accept(SourceRecord sourceRecord) {
145141
return;
146142
}
147143

148-
DDLEvent.Builder builder = DDLEvent.builder()
149-
.setDatabaseName(databaseName)
150-
.setSnapshot(isSnapshot);
151-
152144
Schema schema = value.getSchema();
153145
// send the ddl events only if we see the table at the first time
154146
// Note: the delta app itself have prevented adding CREATE_TABLE operation into DDL blacklist for all the tables.
155147
if (!ddlEventSent.contains(sourceTableId)) {
148+
SqlServerOffset ddlRecordOffset = new SqlServerOffset(sourceRecord.sourceOffset(), ddlEventSent);
149+
150+
DDLEvent.Builder builder = DDLEvent.builder()
151+
.setDatabaseName(databaseName)
152+
.setSnapshot(ddlRecordOffset.isSnapshot())
153+
.setOffset(ddlRecordOffset.getAsOffset());
154+
156155
StructuredRecord key = Records.convert((Struct) sourceRecord.key());
157156
List<Schema.Field> fields = key.getSchema().getFields();
158157
List<String> primaryFields = new ArrayList<>();
159158
if (fields != null && !fields.isEmpty()) {
160159
primaryFields = fields.stream().map(Schema.Field::getName).collect(Collectors.toList());
161160
}
162-
sqlServerOffset.addSnapshotTable(sourceTableId);
163-
Offset recordOffset = sqlServerOffset.getAsOffset();
164-
builder.setOffset(recordOffset);
165161

166162
try {
167163
if (replicateExistingData) {
@@ -187,23 +183,24 @@ public void accept(SourceRecord sourceRecord) {
187183
// happens when the event reader is stopped. throwing this exception tells Debezium to stop right away
188184
throw new StopConnectorException("Interrupted while emitting an event.");
189185
}
190-
ddlEventSent.add(sourceTableId);
191186
}
192187

193188
if (!readAllTables && sourceTable.getDmlBlacklist().contains(op)) {
194189
// do nothing due to it was not set to read all tables and the DML op has been blacklisted for this table
195190
return;
196191
}
197192

193+
ddlEventSent.add(sourceTableId);
194+
SqlServerOffset dmlRecordOffset = new SqlServerOffset(sourceRecord.sourceOffset(), ddlEventSent);
198195
Long ingestTime = val.get("ts_ms");
199196
DMLEvent.Builder dmlBuilder = DMLEvent.builder()
200-
.setOffset(latestOffset)
197+
.setOffset(dmlRecordOffset.getAsOffset())
201198
.setOperationType(op)
202199
.setDatabaseName(databaseName)
203200
.setSchemaName(schemaName)
204201
.setTableName(tableName)
205202
.setRow(value)
206-
.setSnapshot(isSnapshot)
203+
.setSnapshot(dmlRecordOffset.isSnapshot())
207204
.setTransactionId(null)
208205
.setIngestTimestamp(ingestTime == null ? 0L : ingestTime);
209206

0 commit comments

Comments
 (0)