Skip to content

Commit 9830784

Browse files
committed
CDAP-17582 Allow passing the debezium connector configurations through runtime arguments.
1 parent 0cc46fe commit 9830784

4 files changed

Lines changed: 110 additions & 3 deletions

File tree

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.common;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
/**
23+
* Utility class for manipulating runtime arguments.
24+
*/
25+
public final class RuntimeArguments {
26+
private RuntimeArguments() {
27+
}
28+
29+
/**
30+
* Identifies arguments with a given prefix, removes them from the arguments and adds new arguments back without
31+
* the prefix. Arguments which do not start with the given prefix will be ignored.
32+
*
33+
* @param prefix prefix to look for in arguments
34+
* @param arguments the runtime arguments
35+
* @return a new map that contains the arguments without prefix
36+
*/
37+
public static Map<String, String> extractPrefixed(String prefix, Map<String, String> arguments) {
38+
Map<String, String> result = new HashMap<>();
39+
for (Map.Entry<String, String> entry : arguments.entrySet()) {
40+
if (entry.getKey().startsWith(prefix)) {
41+
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
42+
}
43+
}
44+
return result;
45+
}
46+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.common;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* Test for RuntimeArguments.
27+
*/
28+
public class RuntimeArgumentsTest {
29+
30+
@Test
31+
public void testRemovePrefix() {
32+
Map<String, String> arguments = ImmutableMap.of("source.connector.database.host", "1.1.1.1",
33+
"source.connector.name", "somename",
34+
"somekey", "somevalue");
35+
Map<String, String> expected = ImmutableMap.of("database.host", "1.1.1.1",
36+
"name", "somename");
37+
38+
Assert.assertEquals(expected, RuntimeArguments.extractPrefixed("source.connector.", arguments));
39+
}
40+
}

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.delta.api.SourceTable;
2525
import io.cdap.delta.plugin.common.DBSchemaHistory;
2626
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
27+
import io.cdap.delta.plugin.common.RuntimeArguments;
2728
import io.debezium.DebeziumException;
2829
import io.debezium.config.CommonConnectorConfig;
2930
import io.debezium.config.Configuration;
@@ -58,11 +59,13 @@
5859
*/
5960
public class MySqlEventReader implements EventReader {
6061
public static final Logger LOG = LoggerFactory.getLogger(MySqlEventReader.class);
62+
private static final String SOURCE_CONNECTOR_PREFIX = "source.connector.";
6163
private final MySqlConfig config;
6264
private final EventEmitter emitter;
6365
private final ExecutorService executorService;
6466
private final DeltaSourceContext context;
6567
private final Set<SourceTable> sourceTables;
68+
private final Map<String, String> debeziumConnectorConfigs;
6669
private EmbeddedEngine engine;
6770
private volatile boolean failedToStop;
6871

@@ -74,6 +77,8 @@ public MySqlEventReader(Set<SourceTable> sourceTables, MySqlConfig config,
7477
this.emitter = emitter;
7578
this.executorService = Executors.newSingleThreadScheduledExecutor();
7679
this.failedToStop = false;
80+
this.debeziumConnectorConfigs = RuntimeArguments.extractPrefixed(SOURCE_CONNECTOR_PREFIX,
81+
context.getRuntimeArguments());
7782
}
7883

7984
@Override
@@ -121,6 +126,11 @@ public void start(Offset offset) {
121126
configBuilder = configBuilder.with("database.server.id", config.getConsumerID() + context.getInstanceId());
122127
}
123128

129+
LOG.info("Overriding mysql connector configs with arguments {}", debeziumConnectorConfigs);
130+
for (Map.Entry<String, String> entry: debeziumConnectorConfigs.entrySet()) {
131+
configBuilder = configBuilder.with(entry.getKey(), entry.getValue());
132+
}
133+
124134
Configuration debeziumConf = configBuilder.build();
125135
MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
126136
DBSchemaHistory.deltaRuntimeContext = context;

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerEventReader.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.delta.api.SourceTable;
2525
import io.cdap.delta.plugin.common.DBSchemaHistory;
2626
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
27+
import io.cdap.delta.plugin.common.RuntimeArguments;
2728
import io.debezium.config.Configuration;
2829
import io.debezium.connector.sqlserver.SourceInfo;
2930
import io.debezium.connector.sqlserver.SqlServerConnection;
@@ -52,11 +53,13 @@
5253
*/
5354
public class SqlServerEventReader implements EventReader {
5455
private static final Logger LOG = LoggerFactory.getLogger(SqlServerEventReader.class);
56+
private static final String SOURCE_CONNECTOR_PREFIX = "source.connector.";
5557
private final SqlServerConfig config;
5658
private final EventEmitter emitter;
5759
private final DeltaSourceContext context;
5860
private final ExecutorService executorService;
5961
private final Set<SourceTable> tables;
62+
private final Map<String, String> debeziumConnectorConfigs;
6063
private volatile boolean failedStopping;
6164
private EmbeddedEngine engine;
6265

@@ -68,6 +71,8 @@ public SqlServerEventReader(Set<SourceTable> tables, SqlServerConfig config,
6871
this.tables = tables;
6972
this.executorService = Executors.newSingleThreadScheduledExecutor();
7073
this.failedStopping = false;
74+
this.debeziumConnectorConfigs = RuntimeArguments.extractPrefixed(SOURCE_CONNECTOR_PREFIX,
75+
context.getRuntimeArguments());
7176
}
7277

7378
@Override
@@ -101,7 +106,7 @@ public void start(Offset offset) {
101106
Map<String, String> state = offset.get(); // this will never be null
102107
// offset config
103108
String isSnapshotCompleted = state.getOrDefault(SqlServerConstantOffsetBackingStore.SNAPSHOT_COMPLETED, "");
104-
Configuration debeziumConf = Configuration.create()
109+
Configuration.Builder configBuilder = Configuration.create()
105110
.with("connector.class", SqlServerConnector.class.getName())
106111
.with("offset.storage", SqlServerConstantOffsetBackingStore.class.getName())
107112
.with("offset.flush.interval.ms", 1000)
@@ -120,8 +125,14 @@ public void start(Offset offset) {
120125
.with("database.dbname", databaseName)
121126
.with("table.whitelist", String.join(",", sourceTableMap.keySet()))
122127
.with("database.server.name", "dummy") // this is the kafka topic for hosted debezium - it doesn't matter
123-
.with("database.serverTimezone", config.getServerTimezone())
124-
.build();
128+
.with("database.serverTimezone", config.getServerTimezone());
129+
130+
LOG.info("Overriding sql server connector configs with arguments {}", debeziumConnectorConfigs);
131+
for (Map.Entry<String, String> entry: debeziumConnectorConfigs.entrySet()) {
132+
configBuilder = configBuilder.with(entry.getKey(), entry.getValue());
133+
}
134+
135+
Configuration debeziumConf = configBuilder.build();
125136

126137
DBSchemaHistory.deltaRuntimeContext = context;
127138

0 commit comments

Comments
 (0)