Skip to content

Commit fac86bd

Browse files
committed
CDAP-17482 Add option for the source configurations to ignore replicating any existing data.
1 parent 2f043d9 commit fac86bd

10 files changed

Lines changed: 98 additions & 13 deletions

File tree

mysql-delta-plugins/docs/mysql-cdcSource.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ to the MySQL server will be of the form 'user_name'@'%' where user_name is this
7373

7474
**JDBC Plugin Name:** Identifier for the MySQL JDBC driver, which is the name used while uploading the MySQL JDBC driver.
7575

76+
**Replicate Existing Data:** Whether to replicate existing data from the source database. By default, pipeline will
77+
replicate the existing data from source tables. If set to false, any existing data in the source tables will be
78+
ignored and only changes happening after the pipeline started will be replicated.
79+
7680
Troubleshooting
7781
-----------
7882
If the replicator is able to start snapshotting the data, but fails when it switches over to read from the

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public class MySqlConfig extends PluginConfig {
5757
@Description("Identifier for the MySQL JDBC driver, which is the name used while uploading the MySQL JDBC driver.")
5858
private String jdbcPluginName;
5959

60+
@Nullable
61+
@Description("Whether to replicate existing data from the source database. By default, pipeline will replicate " +
62+
"the existing data from source tables. If set to false, any existing data in the source " +
63+
"tables will be ignored and only changes happening after the pipeline started will be replicated.")
64+
private Boolean replicateExistingData;
65+
6066
public MySqlConfig(String host, int port, String user, String password, int consumerID,
6167
String database, @Nullable String serverTimezone) {
6268
this.host = host;
@@ -116,4 +122,8 @@ public Properties getConnectionProperties() {
116122
properties.put("serverTimezone", getServerTimezone());
117123
return properties;
118124
}
125+
126+
public boolean getReplicateExistingData() {
127+
return replicateExistingData != null ? replicateExistingData : true;
128+
}
119129
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public void start(Offset offset) {
119119
.with("database.whitelist", config.getDatabase())
120120
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
121121
.with("database.serverTimezone", config.getServerTimezone())
122-
.with("table.whitelist", String.join(",", sourceTableMap.keySet()));
122+
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
123+
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only");
123124

124125
if (config.getConsumerID() != null) {
125126
// If not provided debezium will randomly pick integer between 5400 and 6400.
@@ -157,7 +158,7 @@ public void start(Offset offset) {
157158
engine = EmbeddedEngine.create()
158159
.using(debeziumConf)
159160
.notifying(new MySqlRecordConsumer(context, emitter, ddlParser, mySqlValueConverters,
160-
new Tables(), sourceTableMap))
161+
new Tables(), sourceTableMap, config.getReplicateExistingData()))
161162
.using(new NotifyingCompletionCallback(context))
162163
.build();
163164
executorService.submit(engine);

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,18 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5656
private final MySqlValueConverters mySqlValueConverters;
5757
private final Tables tables;
5858
private final Map<String, SourceTable> sourceTableMap;
59+
private final boolean replicateExistingData;
5960

6061
public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6162
DdlParser ddlParser, MySqlValueConverters mySqlValueConverters,
62-
Tables tables, Map<String, SourceTable> sourceTableMap) {
63+
Tables tables, Map<String, SourceTable> sourceTableMap, boolean replicateExistingData) {
6364
this.context = context;
6465
this.emitter = emitter;
6566
this.ddlParser = ddlParser;
6667
this.mySqlValueConverters = mySqlValueConverters;
6768
this.tables = tables;
6869
this.sourceTableMap = sourceTableMap;
70+
this.replicateExistingData = replicateExistingData;
6971
}
7072

7173
@Override
@@ -260,7 +262,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
260262
case DROP_TABLE:
261263
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
262264
sourceTable = getSourceTable(databaseName, droppedEvent.tableId().table());
263-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.DROP_TABLE)) {
265+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.DROP_TABLE) &&
266+
generateDropEventOnSnapshot(isSnapshot)) {
264267
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_TABLE)
265268
.setTableName(droppedEvent.tableId().table())
266269
.build();
@@ -281,7 +284,9 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
281284
}
282285
break;
283286
case DROP_DATABASE:
284-
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
287+
if (generateDropEventOnSnapshot(isSnapshot)) {
288+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
289+
}
285290
break;
286291
case CREATE_DATABASE:
287292
// due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
@@ -318,6 +323,19 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
318323
}
319324
}
320325

326+
// Mysql source during snapshotting process generates DROP TABLE and DROP Database
327+
// events. If the source is configured to ignore replication of the existing data, most likely target table
328+
// exists with the snapshot events and user do not want to re-do snapshotting. Do not generate the DROP
329+
// (Table/Database) events in such cases. If user do not want to keep the existing target tables if any, they
330+
// will have to delete those tables manually.
331+
private boolean generateDropEventOnSnapshot(boolean isSnapshot) {
332+
if (!isSnapshot) {
333+
// if not snapshot event, generate DROP event as it is part of explicit DDL operation from user
334+
return true;
335+
}
336+
return replicateExistingData;
337+
}
338+
321339
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation.Type op) {
322340
return (!sourceTableNotValid(readAllTables, sourceTable)) &&
323341
(!isDDLOperationBlacklisted(readAllTables, sourceTable, op));

mysql-delta-plugins/widgets/mysql-cdcSource.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,22 @@
6565
"widget-attributes": {
6666
"default": "UTC"
6767
}
68+
},
69+
{
70+
"name": "replicateExistingData",
71+
"label": "Replicate Existing Data",
72+
"widget-type": "toggle",
73+
"widget-attributes": {
74+
"on": {
75+
"value": "true",
76+
"label": "Yes"
77+
},
78+
"off": {
79+
"value": "false",
80+
"label": "No"
81+
},
82+
"default": "true"
83+
}
6884
}
6985
]
7086
}

sqlserver-delta-plugins/docs/sqlserver-cdcSource.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,7 @@ Plugin Properties
7070
**Database:** Database to replicate data from.
7171

7272
**JDBC Plugin Name:** Identifier for the SQLServer JDBC driver, which is the name used while uploading the SQLServer JDBC driver.
73+
74+
**Replicate Existing Data:** Whether to replicate existing data from the source database. By default, pipeline will
75+
replicate the existing data from source tables. If set to false, any existing data in the source tables will be
76+
ignored and only changes happening after the pipeline started will be replicated.

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public class SqlServerConfig extends PluginConfig {
5151
"JDBC driver.")
5252
private String jdbcPluginName;
5353

54+
@Nullable
55+
@Description("Whether to replicate existing data from the source database. By default, pipeline will replicate " +
56+
"the existing data from source tables. If set to false, any existing data in the source " +
57+
"tables will be ignored and only changes happening after the pipeline started will be replicated.")
58+
private Boolean replicateExistingData;
59+
5460
public SqlServerConfig(String host, int port, String user, String password,
5561
String database, @Nullable String serverTimezone, String jdbcPluginName) {
5662
this.host = host;
@@ -93,4 +99,8 @@ public String getServerTimezone() {
9399
public String getJDBCPluginId() {
94100
return String.format("%s.%s.%s", "sqlserversource", "jbdc", jdbcPluginName);
95101
}
102+
103+
public boolean getReplicateExistingData() {
104+
return replicateExistingData != null ? replicateExistingData : true;
105+
}
96106
}

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ public void start(Offset offset) {
125125
.with("database.dbname", databaseName)
126126
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
127127
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
128-
.with("database.serverTimezone", config.getServerTimezone());
128+
.with("database.serverTimezone", config.getServerTimezone())
129+
.with("snapshot.mode", config.getReplicateExistingData() ? "initial" : "schema_only");
129130

130131
LOG.info("Overriding sql server connector configs with arguments {}", debeziumConnectorConfigs);
131132
for (Map.Entry<String, String> entry: debeziumConnectorConfigs.entrySet()) {
@@ -161,7 +162,8 @@ public void start(Offset offset) {
161162
LOG.info("creating new EmbeddedEngine...");
162163
// Create the engine with this configuration ...
163164
engine = EmbeddedEngine.create()
164-
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, ddlEventSent, sourceTableMap, offset))
165+
.notifying(new SqlServerRecordConsumer(context, emitter, databaseName, ddlEventSent, sourceTableMap, offset,
166+
config.getReplicateExistingData()))
165167
.using(debeziumConf)
166168
.using(new NotifyingCompletionCallback(context))
167169
.build();

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,20 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5454
// record tables that already had DDL events sent
5555
private final Set<String> ddlEventSent;
5656
private final Map<String, SourceTable> sourceTableMap;
57+
private final boolean replicateExistingData;
5758
private Offset latestOffset;
5859

5960

6061
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
6162
Set<String> ddlEventSent, Map<String, SourceTable> sourceTableMap,
62-
Offset latestOffset) {
63+
Offset latestOffset, boolean replicateExistingData) {
6364
this.context = context;
6465
this.emitter = emitter;
6566
this.databaseName = databaseName;
6667
this.ddlEventSent = ddlEventSent;
6768
this.sourceTableMap = sourceTableMap;
6869
this.latestOffset = latestOffset;
70+
this.replicateExistingData = replicateExistingData;
6971
}
7072

7173
@Override
@@ -162,11 +164,13 @@ public void accept(SourceRecord sourceRecord) {
162164
builder.setOffset(recordOffset);
163165

164166
try {
165-
// try to always drop the table before snapshot the schema.
166-
emitter.emit(builder.setOperation(DDLOperation.Type.DROP_TABLE)
167-
.setTableName(tableName)
168-
.setSchemaName(schemaName)
169-
.build());
167+
if (replicateExistingData) {
168+
// try to always drop the table before snapshot the schema.
169+
emitter.emit(builder.setOperation(DDLOperation.Type.DROP_TABLE)
170+
.setTableName(tableName)
171+
.setSchemaName(schemaName)
172+
.build());
173+
}
170174

171175
// try to emit create database event before create table event
172176
emitter.emit(builder.setOperation(DDLOperation.Type.CREATE_DATABASE)

sqlserver-delta-plugins/widgets/sqlserver-cdcSource.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@
6060
"widget-attributes": {
6161
"default": "UTC"
6262
}
63+
},
64+
{
65+
"name": "replicateExistingData",
66+
"label": "Replicate Existing Data",
67+
"widget-type": "toggle",
68+
"widget-attributes": {
69+
"on": {
70+
"value": "true",
71+
"label": "Yes"
72+
},
73+
"off": {
74+
"value": "false",
75+
"label": "No"
76+
},
77+
"default": "true"
78+
}
6379
}
6480
]
6581
}

0 commit comments

Comments
 (0)