Skip to content

Commit c61985e

Browse files
committed
CDAP-17402 Upgrade debezium to 1.3.1.Final version.
1 parent a5857b2 commit c61985e

6 files changed

Lines changed: 663 additions & 726 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/DBSchemaHistory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public synchronized boolean exists() {
7878
}
7979
}
8080

81+
@Override
82+
public boolean storageExists() {
83+
return true;
84+
}
85+
8186
// TODO: cache history, should only have to read once
8287
private List<HistoryRecord> getHistory() {
8388
List<HistoryRecord> history = new ArrayList<>();

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlEventReader.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import io.cdap.delta.api.SourceTable;
2626
import io.cdap.delta.plugin.common.DBSchemaHistory;
2727
import io.cdap.delta.plugin.common.NotifyingCompletionCallback;
28+
import io.debezium.DebeziumException;
29+
import io.debezium.config.CommonConnectorConfig;
2830
import io.debezium.config.Configuration;
2931
import io.debezium.connector.mysql.MySqlConnector;
3032
import io.debezium.connector.mysql.MySqlConnectorConfig;
3133
import io.debezium.connector.mysql.MySqlJdbcContext;
3234
import io.debezium.connector.mysql.MySqlValueConverters;
35+
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
3336
import io.debezium.embedded.EmbeddedEngine;
3437
import io.debezium.jdbc.JdbcConnection;
3538
import io.debezium.jdbc.JdbcValueConverters;
@@ -132,7 +135,7 @@ public void start(Offset offset) {
132135
}
133136

134137
MySqlValueConverters mySqlValueConverters = getValueConverters(mysqlConf);
135-
DdlParser ddlParser = mysqlConf.getDdlParsingMode().getNewParserInstance(mySqlValueConverters, tableId -> true);
138+
DdlParser ddlParser = new MySqlAntlrDdlParser(mySqlValueConverters, tableId -> true);
136139

137140
ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
138141
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
@@ -179,7 +182,11 @@ private static MySqlValueConverters getValueConverters(MySqlConnectorConfig conf
179182

180183
boolean timeAdjusterEnabled = configuration.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
181184
return new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode,
182-
timeAdjusterEnabled ? MySqlEventReader::adjustTemporal : x -> x);
185+
CommonConnectorConfig.BinaryHandlingMode.BYTES,
186+
timeAdjusterEnabled ? MySqlEventReader::adjustTemporal : x -> x,
187+
(message, exception) -> {
188+
throw new DebeziumException(message, exception);
189+
});
183190
}
184191

185192
private static Temporal adjustTemporal(Temporal temporal) {

0 commit comments

Comments
 (0)