Skip to content

Commit cdc971b

Browse files
ignore duplicate events (#133)
* ignore duplicate events * use configured Comparator to comapre history record
1 parent 1e8d1f7 commit cdc971b

4 files changed

Lines changed: 44 additions & 6 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import io.cdap.cdap.api.common.Bytes;
2020
import io.cdap.delta.api.DeltaRuntimeContext;
21+
import io.debezium.config.Configuration;
2122
import io.debezium.connector.AbstractSourceInfo;
2223
import io.debezium.document.DocumentReader;
2324
import io.debezium.document.DocumentWriter;
2425
import io.debezium.relational.history.AbstractDatabaseHistory;
2526
import io.debezium.relational.history.DatabaseHistoryException;
27+
import io.debezium.relational.history.DatabaseHistoryListener;
2628
import io.debezium.relational.history.HistoryRecord;
2729
import io.debezium.relational.history.HistoryRecordComparator;
2830

@@ -41,11 +43,19 @@ public class DBSchemaHistory extends AbstractDatabaseHistory {
4143
public static DeltaRuntimeContext deltaRuntimeContext;
4244
private final DocumentWriter writer = DocumentWriter.defaultWriter();
4345
private final DocumentReader reader = DocumentReader.defaultReader();
46+
private HistoryRecordComparator comparator;
4447

4548
public static void wipeHistory() throws IOException {
4649
deltaRuntimeContext.putState(KEY, new byte[] { });
4750
}
4851

52+
@Override
53+
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener,
54+
boolean useCatalogBeforeSchema) {
55+
this.comparator = comparator == null ? HistoryRecordComparator.INSTANCE : comparator;
56+
super.configure(config, comparator, listener, useCatalogBeforeSchema);
57+
}
58+
4959
@Override
5060
protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
5161
List<HistoryRecord> history = getHistory();
@@ -59,7 +69,7 @@ protected synchronized void storeRecord(HistoryRecord record) throws DatabaseHis
5969
//And all snapshot history record will have same position
6070
if (Boolean.TRUE != record.document().getDocument(HistoryRecord.Fields.POSITION)
6171
.getBoolean(AbstractSourceInfo.SNAPSHOT_KEY) && !history.isEmpty() &&
62-
HistoryRecordComparator.INSTANCE.isAtOrBefore(record, history.get(history.size() - 1))) {
72+
comparator.isAtOrBefore(record, history.get(history.size() - 1))) {
6373
return;
6474
}
6575
history.add(record);

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public void start(Offset offset) {
147147
LOG.info("creating new EmbeddedEngine...");
148148
// Create the engine with this configuration ...
149149
engine = EmbeddedEngine.create()
150-
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, snapshotTables, sourceTableMap))
150+
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, snapshotTables, sourceTableMap, offset))
151151
.using(debeziumConf)
152152
.using(new NotifyingCompletionCallback(context))
153153
.build();

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerOffset.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.delta.sqlserver;
1818

1919
import io.cdap.delta.api.Offset;
20+
import io.debezium.connector.sqlserver.Lsn;
2021
import io.debezium.connector.sqlserver.SourceInfo;
2122

2223
import java.util.HashMap;
@@ -79,6 +80,19 @@ Offset getAsOffset() {
7980
return new Offset(deltaOffset);
8081
}
8182

83+
/**
84+
* Returns whether the this {@link SqlServerOffset SqlServerOffset} instance is before or at the specified delta
85+
* offset. If it's true that means this {@link SqlServerOffset SqlServerOffset} was once seen by the SQLServer
86+
* Debezium connector at the specified delta offset
87+
* @param deltaOffset the delta offset to compare
88+
* @return whether the this {@link SqlServerOffset SqlServerOffset} instance is before or at the specified delta
89+
* offset.
90+
*/
91+
public boolean isBeforeOrAt(Offset deltaOffset) {
92+
return Lsn.valueOf(this.changeLsn)
93+
.compareTo(Lsn.valueOf(deltaOffset.get().get(SourceInfo.CHANGE_LSN_KEY))) < 1;
94+
}
95+
8296
@Override
8397
public boolean equals(Object o) {
8498
if (this == o) {

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,18 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5353
private final String databaseName;
5454
private final Set<String> snapshotTables;
5555
private final Map<String, SourceTable> sourceTableMap;
56+
private Offset latestOffset;
57+
5658

5759
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
58-
Set<String> snapshotTables, Map<String, SourceTable> sourceTableMap) {
60+
Set<String> snapshotTables, Map<String, SourceTable> sourceTableMap,
61+
Offset latestOffset) {
5962
this.context = context;
6063
this.emitter = emitter;
6164
this.databaseName = databaseName;
6265
this.snapshotTables = snapshotTables;
6366
this.sourceTableMap = sourceTableMap;
67+
this.latestOffset = latestOffset;
6468
}
6569

6670
@Override
@@ -75,9 +79,19 @@ public void accept(SourceRecord sourceRecord) {
7579
}
7680

7781
SqlServerOffset sqlServerOffset = new SqlServerOffset(sourceRecord.sourceOffset());
82+
// ignore duplicated CDC event
83+
// SQLServer connector will relay the last event at the offset
84+
// to be safe here we check whether it's before or at the same offset
85+
// snapshotting will resume from beginning, and the whole table that is partly snapshotted
86+
// is supposed to be dropped first , thus no need to consider
87+
if (!sqlServerOffset.isSnapshot() && sqlServerOffset.isBeforeOrAt(latestOffset)) {
88+
LOG.debug("Got duplicated event {} ", sourceRecord);
89+
return;
90+
}
91+
7892
sqlServerOffset.setSnapshotTables(snapshotTables);
7993
boolean isSnapshot = sqlServerOffset.isSnapshot();
80-
Offset recordOffset = sqlServerOffset.getAsOffset();
94+
latestOffset = sqlServerOffset.getAsOffset();
8195

8296
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
8397
DMLOperation.Type op;
@@ -143,7 +157,7 @@ public void accept(SourceRecord sourceRecord) {
143157
primaryFields = fields.stream().map(Schema.Field::getName).collect(Collectors.toList());
144158
}
145159
sqlServerOffset.addSnapshotTable(sourceTableId);
146-
recordOffset = sqlServerOffset.getAsOffset();
160+
Offset recordOffset = sqlServerOffset.getAsOffset();
147161
builder.setOffset(recordOffset);
148162

149163
try {
@@ -174,7 +188,7 @@ public void accept(SourceRecord sourceRecord) {
174188

175189
Long ingestTime = val.get("ts_ms");
176190
DMLEvent.Builder dmlBuilder = DMLEvent.builder()
177-
.setOffset(recordOffset)
191+
.setOffset(latestOffset)
178192
.setOperationType(op)
179193
.setDatabaseName(databaseName)
180194
.setTableName(tableName)

0 commit comments

Comments
 (0)