@@ -93,7 +93,7 @@ public void start(Offset offset) {
9393 Map <String , SourceTable > sourceTableMap = sourceTables .stream ().collect (
9494 Collectors .toMap (t -> config .getDatabase () + "." + t .getTable (), t -> t ));
9595 Map <String , String > state = offset .get (); // state map is always not null
96- Configuration debeziumConf = Configuration .create ()
96+ Configuration . Builder configBuilder = Configuration .create ()
9797 .with ("connector.class" , MySqlConnector .class .getName ())
9898 .with ("offset.storage" , MySqlConstantOffsetBackingStore .class .getName ())
9999 .with ("offset.flush.interval.ms" , 1000 )
@@ -110,13 +110,18 @@ public void start(Offset offset) {
110110 .with ("database.port" , config .getPort ())
111111 .with ("database.user" , config .getUser ())
112112 .with ("database.password" , config .getPassword ())
113- .with ("database.server.id" , config .getConsumerID () + context .getInstanceId ())
114113 .with ("database.history" , DBSchemaHistory .class .getName ())
115114 .with ("database.whitelist" , config .getDatabase ())
116115 .with ("database.server.name" , "dummy" ) // this is the kafka topic for hosted debezium - it doesn't matter
117116 .with ("database.serverTimezone" , config .getServerTimezone ())
118- .with ("table.whitelist" , String .join ("," , sourceTableMap .keySet ()))
119- .build ();
117+ .with ("table.whitelist" , String .join ("," , sourceTableMap .keySet ()));
118+
119+ if (config .getConsumerID () != null ) {
120+ // If not provided debezium will randomly pick integer between 5400 and 6400.
121+ configBuilder = configBuilder .with ("database.server.id" , config .getConsumerID () + context .getInstanceId ());
122+ }
123+
124+ Configuration debeziumConf = configBuilder .build ();
120125 MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig (debeziumConf );
121126 DBSchemaHistory .deltaRuntimeContext = context ;
122127 /*
0 commit comments