@@ -200,8 +200,8 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
200200 DMLEvent .Builder builder = DMLEvent .builder ()
201201 .setOffset (recordOffset )
202202 .setOperationType (op )
203- .setDatabase (databaseName )
204- .setTable (tableName )
203+ .setDatabaseName (databaseName )
204+ .setTableName (tableName )
205205 .setTransactionId (transactionId )
206206 .setIngestTimestamp (ingestTime )
207207 .setSnapshot (isSnapshot );
@@ -221,14 +221,14 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
221221 ddlParser .getDdlChanges ().reset ();
222222 ddlParser .parse (ddlStatement , tables );
223223 AtomicReference <InterruptedException > interrupted = new AtomicReference <>();
224- ddlParser .getDdlChanges ().groupEventsByDatabase ((database , events ) -> {
224+ ddlParser .getDdlChanges ().groupEventsByDatabase ((databaseName , events ) -> {
225225 if (interrupted .get () != null ) {
226226 return ;
227227 }
228228 for (DdlParserListener .Event event : events ) {
229229 DDLEvent .Builder builder = DDLEvent .builder ()
230230 .setOffset (recordOffset )
231- .setDatabase ( database )
231+ .setDatabaseName ( databaseName )
232232 .setSnapshot (isSnapshot );
233233 DDLEvent ddlEvent = null ;
234234 // since current ddl blacklist implementation is bind with table level, we will only do the ddl blacklist
@@ -239,18 +239,18 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
239239 DdlParserListener .TableAlteredEvent alteredEvent = (DdlParserListener .TableAlteredEvent ) event ;
240240 TableId tableId = alteredEvent .tableId ();
241241 Table table = tables .forTable (tableId );
242- SourceTable sourceTable = getSourceTable (database , tableId .table ());
242+ SourceTable sourceTable = getSourceTable (databaseName , tableId .table ());
243243 DDLOperation .Type ddlOp ;
244244 if (alteredEvent .previousTableId () != null ) {
245245 ddlOp = DDLOperation .Type .RENAME_TABLE ;
246- builder .setPrevTable (alteredEvent .previousTableId ().table ());
246+ builder .setPrevTableName (alteredEvent .previousTableId ().table ());
247247 } else {
248248 ddlOp = DDLOperation .Type .ALTER_TABLE ;
249249 }
250250
251251 if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , ddlOp )) {
252252 ddlEvent = builder .setOperation (ddlOp )
253- .setTable (tableId .table ())
253+ .setTableName (tableId .table ())
254254 .setSchema (readAllTables ? Records .getSchema (table , mySqlValueConverters ) :
255255 Records .getSchema (table , mySqlValueConverters , sourceTable .getColumns ()))
256256 .setPrimaryKey (table .primaryKeyColumnNames ())
@@ -259,21 +259,21 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
259259 break ;
260260 case DROP_TABLE :
261261 DdlParserListener .TableDroppedEvent droppedEvent = (DdlParserListener .TableDroppedEvent ) event ;
262- sourceTable = getSourceTable (database , droppedEvent .tableId ().table ());
262+ sourceTable = getSourceTable (databaseName , droppedEvent .tableId ().table ());
263263 if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , DDLOperation .Type .DROP_TABLE )) {
264264 ddlEvent = builder .setOperation (DDLOperation .Type .DROP_TABLE )
265- .setTable (droppedEvent .tableId ().table ())
265+ .setTableName (droppedEvent .tableId ().table ())
266266 .build ();
267267 }
268268 break ;
269269 case CREATE_TABLE :
270270 DdlParserListener .TableCreatedEvent createdEvent = (DdlParserListener .TableCreatedEvent ) event ;
271271 tableId = createdEvent .tableId ();
272272 table = tables .forTable (tableId );
273- sourceTable = getSourceTable (database , tableId .table ());
273+ sourceTable = getSourceTable (databaseName , tableId .table ());
274274 if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , DDLOperation .Type .CREATE_TABLE )) {
275275 ddlEvent = builder .setOperation (DDLOperation .Type .CREATE_TABLE )
276- .setTable (tableId .table ())
276+ .setTableName (tableId .table ())
277277 .setSchema (readAllTables ? Records .getSchema (table , mySqlValueConverters ) :
278278 Records .getSchema (table , mySqlValueConverters , sourceTable .getColumns ()))
279279 .setPrimaryKey (table .primaryKeyColumnNames ())
@@ -296,10 +296,10 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
296296 case TRUNCATE_TABLE :
297297 DdlParserListener .TableTruncatedEvent truncatedEvent =
298298 (DdlParserListener .TableTruncatedEvent ) event ;
299- sourceTable = getSourceTable (database , truncatedEvent .tableId ().table ());
299+ sourceTable = getSourceTable (databaseName , truncatedEvent .tableId ().table ());
300300 if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , DDLOperation .Type .TRUNCATE_TABLE )) {
301301 ddlEvent = builder .setOperation (DDLOperation .Type .TRUNCATE_TABLE )
302- .setTable (truncatedEvent .tableId ().table ())
302+ .setTableName (truncatedEvent .tableId ().table ())
303303 .build ();
304304 }
305305 break ;
0 commit comments