@@ -56,16 +56,18 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5656 private final MySqlValueConverters mySqlValueConverters ;
5757 private final Tables tables ;
5858 private final Map <String , SourceTable > sourceTableMap ;
59+ private final boolean replicateExistingData ;
5960
6061 public MySqlRecordConsumer (DeltaSourceContext context , EventEmitter emitter ,
6162 DdlParser ddlParser , MySqlValueConverters mySqlValueConverters ,
62- Tables tables , Map <String , SourceTable > sourceTableMap ) {
63+ Tables tables , Map <String , SourceTable > sourceTableMap , boolean replicateExistingData ) {
6364 this .context = context ;
6465 this .emitter = emitter ;
6566 this .ddlParser = ddlParser ;
6667 this .mySqlValueConverters = mySqlValueConverters ;
6768 this .tables = tables ;
6869 this .sourceTableMap = sourceTableMap ;
70+ this .replicateExistingData = replicateExistingData ;
6971 }
7072
7173 @ Override
@@ -260,7 +262,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
260262 case DROP_TABLE :
261263 DdlParserListener .TableDroppedEvent droppedEvent = (DdlParserListener .TableDroppedEvent ) event ;
262264 sourceTable = getSourceTable (databaseName , droppedEvent .tableId ().table ());
263- if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , DDLOperation .Type .DROP_TABLE )) {
265+ if (shouldEmitDdlEventForOperation (readAllTables , sourceTable , DDLOperation .Type .DROP_TABLE ) &&
266+ generateDropEventOnSnapshot (isSnapshot )) {
264267 ddlEvent = builder .setOperation (DDLOperation .Type .DROP_TABLE )
265268 .setTableName (droppedEvent .tableId ().table ())
266269 .build ();
@@ -281,7 +284,9 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
281284 }
282285 break ;
283286 case DROP_DATABASE :
284- ddlEvent = builder .setOperation (DDLOperation .Type .DROP_DATABASE ).build ();
287+ if (generateDropEventOnSnapshot (isSnapshot )) {
288+ ddlEvent = builder .setOperation (DDLOperation .Type .DROP_DATABASE ).build ();
289+ }
285290 break ;
286291 case CREATE_DATABASE :
287292 // due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
@@ -318,6 +323,19 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
318323 }
319324 }
320325
326+ // Mysql source during snapshotting process generates DROP TABLE and DROP Database
327+ // events. If the source is configured to ignore replication of the existing data, most likely target table
328+ // exists with the snapshot events and user do not want to re-do snapshotting. Do not generate the DROP
329+ // (Table/Database) events in such cases. If user do not want to keep the existing target tables if any, they
330+ // will have to delete those tables manually.
331+ private boolean generateDropEventOnSnapshot (boolean isSnapshot ) {
332+ if (!isSnapshot ) {
333+ // if not snapshot event, generate DROP event as it is part of explicit DDL operation from user
334+ return true ;
335+ }
336+ return replicateExistingData ;
337+ }
338+
321339 private boolean shouldEmitDdlEventForOperation (boolean readAllTables , SourceTable sourceTable , DDLOperation .Type op ) {
322340 return (!sourceTableNotValid (readAllTables , sourceTable )) &&
323341 (!isDDLOperationBlacklisted (readAllTables , sourceTable , op ));
0 commit comments