Skip to content

Commit 3e75f43

Browse files
authored
Merge pull request #109 from data-integrations/bugfix_release/upgrade-debezium-version
[CDAP-17402] Upgrade debizum version to 1.3.1.Final
2 parents a5857b2 + b377ab1 commit 3e75f43

8 files changed

Lines changed: 773 additions & 619 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public synchronized boolean exists() {
7878
}
7979
}
8080

81+
@Override
82+
public boolean storageExists() {
83+
return true;
84+
}
85+
8186
// TODO: cache history, should only have to read once
8287
private List<HistoryRecord> getHistory() {
8388
List<HistoryRecord> history = new ArrayList<>();

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/Records.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ public static Schema convert(org.apache.kafka.connect.data.Schema schema) {
304304
for (Field field : fields) {
305305
cdapFields.add(Schema.Field.of(field.name(), convert(field.schema())));
306306
}
307-
converted = Schema.recordOf(schema.name(), cdapFields);
307+
308+
converted = Schema.recordOf(schema.name() == null ? "dummy.schema.name" : schema.name(), cdapFields);
308309
break;
309310
default:
310311
// should never happen, all values are listed above

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import io.cdap.delta.api.SourceTable;
2626
import io.cdap.delta.plugin.common.DBSchemaHistory;
2727
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
28+
import io.debezium.DebeziumException;
29+
import io.debezium.config.CommonConnectorConfig;
2830
import io.debezium.config.Configuration;
2931
import io.debezium.connector.mysql.MySqlConnector;
3032
import io.debezium.connector.mysql.MySqlConnectorConfig;
3133
import io.debezium.connector.mysql.MySqlJdbcContext;
3234
import io.debezium.connector.mysql.MySqlValueConverters;
35+
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
3336
import io.debezium.embedded.EmbeddedEngine;
3437
import io.debezium.jdbc.JdbcConnection;
3538
import io.debezium.jdbc.JdbcValueConverters;
@@ -132,7 +135,7 @@ public void start(Offset offset) {
132135
}
133136

134137
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
135-
DdlParser ddlParser = mysqlConf.getDdlParsingMode().getNewParserInstance(mySqlValueConverters, tableId -> true);
138+
DdlParser ddlParser = new MySqlAntlrDdlParser(mySqlValueConverters, tableId -> true);
136139

137140
ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
138141
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
@@ -179,7 +182,11 @@ private static MySqlValueConverters getValueConverters(MySqlConnectorConfig conf
179182

180183
boolean timeAdjusterEnabled = configuration.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
181184
return new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode,
182-
timeAdjusterEnabled ? MySqlEventReader::adjustTemporal : x -> x);
185+
CommonConnectorConfig.BinaryHandlingMode.BYTES,
186+
timeAdjusterEnabled ? MySqlEventReader::adjustTemporal : x -> x,
187+
(message, exception) -> {
188+
throw new DebeziumException(message, exception);
189+
});
183190
}
184191

185192
private static Temporal adjustTemporal(Temporal temporal) {

0 commit comments

Comments
 (0)