Skip to content

Commit 11c5744

Browse files
committed
CDAP-17370 PRovide unique consumer id to each worker instance in the pipeline for mysql source.
1 parent 8e82e2e commit 11c5744

2 files changed

Lines changed: 4 additions & 2 deletions

File tree

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class MySqlConfig extends PluginConfig {
4040
@Description("Password to use to connect to the MySQL server.")
4141
private String password;
4242

43-
@Description("A unique numeric ID to identify this origin as an event consumer.")
43+
@Description("A unique numeric ID to identify this origin as an event consumer. When replication pipeline is " +
44+
"configured with multiple instances, each instance gets unique consumer id by adding instance id to " +
45+
"this supplied consumer id.")
4446
private int consumerID;
4547

4648
@Description("Database to consume events for.")

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void start(Offset offset) {
107107
.with("database.port", config.getPort())
108108
.with("database.user", config.getUser())
109109
.with("database.password", config.getPassword())
110-
.with("database.server.id", config.getConsumerID())
110+
.with("database.server.id", config.getConsumerID() + context.getInstanceId())
111111
.with("database.history", DBSchemaHistory.class.getName())
112112
.with("database.whitelist", config.getDatabase())
113113
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter

0 commit comments

Comments
 (0)