Skip to content

Commit d559e16

Browse files
committed
CDAP-17472 Make consumer id optional for mysql plugin.
1 parent 775cf5c commit d559e16

4 files changed

Lines changed: 24 additions & 17 deletions

File tree

mysql-delta-plugins/docs/mysql-cdcSource.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ Plugin Properties
5858

5959
**Port:** Port to use to connect to the MySQL server.
6060

61-
**Consumer ID:** An unique numeric ID to identify this origin as an event consumer. This number cannot be the same as
62-
another Delta Replicator that is reading from the server, and it cannot be the same as the server-id for any MySQL
63-
slave that is replicating from the server.
61+
**Consumer ID:** Optional unique numeric ID to identify this origin as an event consumer. This number cannot be the
62+
same as another Delta Replicator that is reading from the server, and it cannot be the same as the server-id for any
63+
MySQL slave that is replicating from the server. By default, random number will be used.
6464

6565
**Server Timezone:** Timezone of the MySQL server. This is used when converting dates into timestamps.
6666

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ public class MySqlConfig extends PluginConfig {
4141
@Description("Password to use to connect to the MySQL server.")
4242
private String password;
4343

44-
@Description("A unique numeric ID to identify this origin as an event consumer. When replication pipeline is " +
45-
"configured with multiple instances, each instance gets unique consumer id by adding instance id to " +
46-
"this supplied consumer id.")
47-
private int consumerID;
44+
@Nullable
45+
@Description("An optional unique numeric ID to identify this origin as an event consumer. When replication " +
46+
"pipeline is configured with multiple instances, each instance gets unique consumer id by adding instance id to " +
47+
"this supplied consumer id. By default, random number will be used.")
48+
private Integer consumerID;
4849

4950
@Description("Database to replicate data from.")
5051
private String database;
@@ -83,7 +84,8 @@ public String getPassword() {
8384
return password;
8485
}
8586

86-
public int getConsumerID() {
87+
@Nullable
88+
public Integer getConsumerID() {
8789
return consumerID;
8890
}
8991

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void start(Offset offset) {
9393
Map<String, SourceTable> sourceTableMap = sourceTables.stream().collect(
9494
Collectors.toMap(t -> config.getDatabase() + "." + t.getTable(), t -> t));
9595
Map<String, String> state = offset.get(); // state map is always not null
96-
Configuration debeziumConf = Configuration.create()
96+
Configuration.Builder configBuilder = Configuration.create()
9797
.with("connector.class", MySqlConnector.class.getName())
9898
.with("offset.storage", MySqlConstantOffsetBackingStore.class.getName())
9999
.with("offset.flush.interval.ms", 1000)
@@ -110,13 +110,18 @@ public void start(Offset offset) {
110110
.with("database.port", config.getPort())
111111
.with("database.user", config.getUser())
112112
.with("database.password", config.getPassword())
113-
.with("database.server.id", config.getConsumerID() + context.getInstanceId())
114113
.with("database.history", DBSchemaHistory.class.getName())
115114
.with("database.whitelist", config.getDatabase())
116115
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
117116
.with("database.serverTimezone", config.getServerTimezone())
118-
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
119-
.build();
117+
.with("table.whitelist", String.join(",", sourceTableMap.keySet()));
118+
119+
if (config.getConsumerID() != null) {
120+
// If not provided debezium will randomly pick integer between 5400 and 6400.
121+
configBuilder = configBuilder.with("database.server.id", config.getConsumerID() + context.getInstanceId());
122+
}
123+
124+
Configuration debeziumConf = configBuilder.build();
120125
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
121126
DBSchemaHistory.deltaRuntimeContext = context;
122127
/*

mysql-delta-plugins/widgets/mysql-cdcSource.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@
2020
"default": "3306"
2121
}
2222
},
23-
{
24-
"name": "consumerID",
25-
"label": "Consumer ID",
26-
"widget-type": "textbox"
27-
},
2823
{
2924
"name": "jdbcPluginName",
3025
"label": "JDBC Plugin Name",
@@ -58,6 +53,11 @@
5853
{
5954
"label": "Advanced",
6055
"properties": [
56+
{
57+
"name": "consumerID",
58+
"label": "Consumer ID",
59+
"widget-type": "textbox"
60+
},
6161
{
6262
"name": "serverTimezone",
6363
"label": "Server Timezone",

0 commit comments

Comments
 (0)