66
77package io .debezium .connector .sqlserver ;
88
9+ import io .debezium .config .CommonConnectorConfig ;
10+ import io .debezium .config .Configuration ;
11+ import io .debezium .jdbc .JdbcConfiguration ;
12+ import io .debezium .jdbc .JdbcConnection ;
13+ import io .debezium .relational .Column ;
14+ import io .debezium .relational .ColumnEditor ;
15+ import io .debezium .relational .Table ;
16+ import io .debezium .relational .TableId ;
17+ import io .debezium .util .BoundedConcurrentHashMap ;
18+ import io .debezium .util .Clock ;
19+ import org .slf4j .Logger ;
20+ import org .slf4j .LoggerFactory ;
21+
922import java .sql .DatabaseMetaData ;
1023import java .sql .ResultSet ;
1124import java .sql .SQLException ;
2437import java .util .regex .Pattern ;
2538import java .util .stream .Collectors ;
2639
27- import org .slf4j .Logger ;
28- import org .slf4j .LoggerFactory ;
29-
30- import io .debezium .config .CommonConnectorConfig ;
31- import io .debezium .config .Configuration ;
32- import io .debezium .jdbc .JdbcConfiguration ;
33- import io .debezium .jdbc .JdbcConnection ;
34- import io .debezium .relational .Column ;
35- import io .debezium .relational .ColumnEditor ;
36- import io .debezium .relational .Table ;
37- import io .debezium .relational .TableId ;
38- import io .debezium .util .BoundedConcurrentHashMap ;
39- import io .debezium .util .Clock ;
40-
4140/**
4241 * {@link JdbcConnection} extension to be used with Microsoft SQL Server
4342 *
@@ -60,15 +59,18 @@ public class SqlServerConnection extends JdbcConnection {
6059 private static final String SQL_SERVER_VERSION = "SELECT @@VERSION AS 'SQL Server Version'" ;
6160 private final String lsnToTimestamp ;
6261 private static final String INCREMENT_LSN = "SELECT sys.fn_cdc_increment_lsn(?)" ;
63- private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')" ;
62+ private static final String GET_ALL_CHANGES_FOR_TABLE
63+ = "SELECT * FROM cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')" ;
6464 private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "EXEC sys.sp_cdc_help_change_data_capture" ;
65- private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "SELECT * FROM cdc.change_tables WHERE start_lsn BETWEEN ? AND ?" ;
65+ private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES
66+ = "SELECT * FROM cdc.change_tables WHERE start_lsn BETWEEN ? AND ?" ;
6667 private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT * FROM cdc.index_columns WHERE object_id=?" ;
6768 private static final Pattern BRACKET_PATTERN = Pattern .compile ("[\\ [\\ ]]" );
6869
6970 private static final int CHANGE_TABLE_DATA_COLUMN_OFFSET = 5 ;
7071
71- private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration .HOSTNAME + "}:${" + JdbcConfiguration .PORT + "};databaseName=${"
72+ private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration .HOSTNAME + "}:${"
73+ + JdbcConfiguration .PORT + "};databaseName=${"
7274 + JdbcConfiguration .DATABASE + "}" ;
7375
7476 // Note: this line is the only change from the original file. Renamed variable to factory and
@@ -95,7 +97,8 @@ public class SqlServerConnection extends JdbcConnection {
9597 * @param sourceTimestampMode strategy for populating {@code source.ts_ms}.
9698 * @param valueConverters {@link SqlServerValueConverters} instance
9799 */
98- public SqlServerConnection (Configuration config , Clock clock , SourceTimestampMode sourceTimestampMode , SqlServerValueConverters valueConverters ) {
100+ public SqlServerConnection (Configuration config , Clock clock , SourceTimestampMode sourceTimestampMode ,
101+ SqlServerValueConverters valueConverters ) {
99102 this (config , clock , sourceTimestampMode , valueConverters , null );
100103 }
101104
@@ -108,9 +111,10 @@ public SqlServerConnection(Configuration config, Clock clock, SourceTimestampMod
108111 * @param valueConverters {@link SqlServerValueConverters} instance
109112 * @param classLoaderSupplier class loader supplier
110113 */
111- public SqlServerConnection (Configuration config , Clock clock , SourceTimestampMode sourceTimestampMode , SqlServerValueConverters valueConverters ,
114+ public SqlServerConnection (Configuration config , Clock clock , SourceTimestampMode sourceTimestampMode ,
115+ SqlServerValueConverters valueConverters ,
112116 Supplier <ClassLoader > classLoaderSupplier ) {
113- super (config , FACTORY , classLoaderSupplier );
117+ super (config , factory , classLoaderSupplier );
114118 lsnToInstantCache = new BoundedConcurrentHashMap <>(100 );
115119 realDatabaseName = retrieveRealDatabaseName ();
116120 boolean supportsAtTimeZone = supportsAtTimeZone ();
@@ -172,7 +176,8 @@ public Lsn getMinLsn(String changeTableName) throws SQLException {
172176 * @param consumer - the change processor
173177 * @throws SQLException
174178 */
175- public void getChangesForTable (TableId tableId , Lsn fromLsn , Lsn toLsn , ResultSetConsumer consumer ) throws SQLException {
179+ public void getChangesForTable (TableId tableId , Lsn fromLsn , Lsn toLsn ,
180+ ResultSetConsumer consumer ) throws SQLException {
176181 final String query = GET_ALL_CHANGES_FOR_TABLE .replace (STATEMENTS_PLACEHOLDER , cdcNameForTable (tableId ));
177182 prepareQuery (query , statement -> {
178183 statement .setBytes (1 , fromLsn .getBinary ());
@@ -189,7 +194,8 @@ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSe
189194 * @param consumer - the change processor
190195 * @throws SQLException
191196 */
192- public void getChangesForTables (SqlServerChangeTable [] changeTables , Lsn intervalFromLsn , Lsn intervalToLsn , BlockingMultiResultSetConsumer consumer )
197+ public void getChangesForTables (SqlServerChangeTable [] changeTables , Lsn intervalFromLsn , Lsn intervalToLsn ,
198+ BlockingMultiResultSetConsumer consumer )
193199 throws SQLException , InterruptedException {
194200 final String [] queries = new String [changeTables .length ];
195201 final StatementPreparer [] preparers = new StatementPreparer [changeTables .length ];
@@ -216,7 +222,8 @@ public void getChangesForTables(SqlServerChangeTable[] changeTables, Lsn interva
216222 }
217223
218224 private Lsn getFromLsn (SqlServerChangeTable changeTable , Lsn intervalFromLsn ) throws SQLException {
219- Lsn fromLsn = changeTable .getStartLsn ().compareTo (intervalFromLsn ) > 0 ? changeTable .getStartLsn () : intervalFromLsn ;
225+ Lsn fromLsn = changeTable .getStartLsn ().compareTo (intervalFromLsn ) > 0 ? changeTable .getStartLsn () :
226+ intervalFromLsn ;
220227 return fromLsn .getBinary () != null ? fromLsn : getMinLsn (changeTable .getCaptureInstance ());
221228 }
222229
@@ -303,6 +310,9 @@ private String cdcNameForTable(TableId tableId) {
303310 return tableId .schema () + '_' + tableId .table ();
304311 }
305312
313+ /**
314+ *
315+ */
306316 public static class CdcEnabledTable {
307317 private final String tableId ;
308318 private final String captureName ;
@@ -408,7 +418,8 @@ public Table getTableSchemaFromChangeTable(SqlServerChangeTable changeTable) thr
408418 }
409419
410420 // The first 5 columns and the last column of the change table are CDC metadata
411- final List <Column > columns = columnEditors .subList (CHANGE_TABLE_DATA_COLUMN_OFFSET , columnEditors .size () - 1 ).stream ()
421+ final List <Column > columns = columnEditors .subList (CHANGE_TABLE_DATA_COLUMN_OFFSET ,
422+ columnEditors .size () - 1 ).stream ()
412423 .map (c -> c .position (c .position () - CHANGE_TABLE_DATA_COLUMN_OFFSET ).create ())
413424 .collect (Collectors .toList ());
414425
@@ -439,13 +450,14 @@ private ZoneId retrieveTransactionTimezone(boolean supportsAtTimeZone) {
439450
440451 if (supportsAtTimeZone ) {
441452 if (serverTimezoneConfig != null ) {
442- LOGGER .warn ("The '{}' option should not be specified with SQL Server 2016 and newer" , SERVER_TIMEZONE_PROP_NAME );
453+ LOGGER .warn ("The '{}' option should not be specified with SQL Server 2016 and newer" ,
454+ SERVER_TIMEZONE_PROP_NAME );
443455 }
444- }
445- else {
456+ } else {
446457 if (serverTimezoneConfig == null ) {
447458 LOGGER .warn (
448- "The '{}' option should be specified to avoid incorrect timestamp values in case of different timezones between the database server and this connector's JVM." ,
459+ "The '{}' option should be specified to avoid incorrect timestamp values in case of different " +
460+ "timezones between the database server and this connector's JVM." ,
449461 SERVER_TIMEZONE_PROP_NAME );
450462 }
451463 }
@@ -461,8 +473,7 @@ private String retrieveRealDatabaseName() {
461473 return queryAndMap (
462474 GET_DATABASE_NAME ,
463475 singleResultMapper (rs -> rs .getString (1 ), "Could not retrieve database name" ));
464- }
465- catch (SQLException e ) {
476+ } catch (SQLException e ) {
466477 throw new RuntimeException ("Couldn't obtain database name" , e );
467478 }
468479 }
@@ -474,8 +485,7 @@ private boolean supportsAtTimeZone() {
474485 try {
475486 // Always expect the support if database is not standalone SQL Server, e.g. Azure
476487 return getSqlServerVersion ().orElse (Integer .MAX_VALUE ) > 2016 ;
477- }
478- catch (Exception e ) {
488+ } catch (Exception e ) {
479489 LOGGER .error ("Couldn't obtain database server version; assuming 'AT TIME ZONE' is not supported." , e );
480490 return false ;
481491 }
@@ -492,8 +502,7 @@ private Optional<Integer> getSqlServerVersion() {
492502 return Optional .empty ();
493503 }
494504 return Optional .of (Integer .valueOf (version .substring (21 , 25 )));
495- }
496- catch (Exception e ) {
505+ } catch (Exception e ) {
497506 throw new RuntimeException ("Couldn't obtain database server version" , e );
498507 }
499508 }
0 commit comments