Skip to content

Commit 5986606

Browse files
authored
Merge pull request #103 from data-integrations/bugfix_release/CDAP-17370
CDAP-17370 Provide unique consumer id to each worker instance in the pipeline for mysql source.
2 parents 3e75f43 + 11c5744 commit 5986606

2 files changed

Lines changed: 4 additions & 2 deletions

File tree

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class MySqlConfig extends PluginConfig {
4040
@Description("Password to use to connect to the MySQL server.")
4141
private String password;
4242

43-
@Description("A unique numeric ID to identify this origin as an event consumer.")
43+
@Description("A unique numeric ID to identify this origin as an event consumer. When replication pipeline is " +
44+
"configured with multiple instances, each instance gets unique consumer id by adding instance id to " +
45+
"this supplied consumer id.")
4446
private int consumerID;
4547

4648
@Description("Database to consume events for.")

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void start(Offset offset) {
110110
.with("database.port", config.getPort())
111111
.with("database.user", config.getUser())
112112
.with("database.password", config.getPassword())
113-
.with("database.server.id", config.getConsumerID())
113+
.with("database.server.id", config.getConsumerID() + context.getInstanceId())
114114
.with("database.history", DBSchemaHistory.class.getName())
115115
.with("database.whitelist", config.getDatabase())
116116
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter

0 commit comments

Comments
 (0)