Skip to content

Commit 9dfd55d

Browse files
committed
CDAP-17371 Update replicator sources to call updated version of the metrics API.
1 parent 77c8d13 commit 9dfd55d

5 files changed

Lines changed: 61 additions & 50 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/mock/MockContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.delta.api.DeltaSourceContext;
2323
import io.cdap.delta.api.ReplicationError;
2424

25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.Map;
2728
import javax.annotation.Nullable;
@@ -69,6 +70,16 @@ public void count(String metricName, int delta) {
6970
public void gauge(String metricName, long value) {
7071
// no-op
7172
}
73+
74+
@Override
75+
public Metrics child(Map<String, String> tags) {
76+
return this;
77+
}
78+
79+
@Override
80+
public Map<String, String> getTags() {
81+
return Collections.emptyMap();
82+
}
7283
};
7384
}
7485

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
159159
return;
160160
}
161161

162-
DMLOperation op;
162+
DMLOperation.Type op;
163163
String opStr = val.get("op");
164164
if ("c".equals(opStr)) {
165-
op = DMLOperation.INSERT;
165+
op = DMLOperation.Type.INSERT;
166166
} else if ("u".equals(opStr)) {
167-
op = DMLOperation.UPDATE;
167+
op = DMLOperation.Type.UPDATE;
168168
} else if ("d".equals(opStr)) {
169-
op = DMLOperation.DELETE;
169+
op = DMLOperation.Type.DELETE;
170170
} else {
171171
LOG.warn("Skipping unknown operation type '{}'", opStr);
172172
return;
@@ -199,17 +199,17 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
199199
Long ingestTime = val.get("ts_ms");
200200
DMLEvent.Builder builder = DMLEvent.builder()
201201
.setOffset(recordOffset)
202-
.setOperation(op)
202+
.setOperationType(op)
203203
.setDatabase(databaseName)
204204
.setTable(tableName)
205205
.setTransactionId(transactionId)
206206
.setIngestTimestamp(ingestTime)
207207
.setSnapshot(isSnapshot);
208208

209209
// It is required for the source to provide the previous row if the dml operation is 'UPDATE'
210-
if (op == DMLOperation.UPDATE) {
210+
if (op == DMLOperation.Type.UPDATE) {
211211
emitter.emit(builder.setPreviousRow(before).setRow(after).build());
212-
} else if (op == DMLOperation.DELETE) {
212+
} else if (op == DMLOperation.Type.DELETE) {
213213
emitter.emit(builder.setRow(before).build());
214214
} else {
215215
emitter.emit(builder.setRow(after).build());
@@ -240,12 +240,12 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
240240
TableId tableId = alteredEvent.tableId();
241241
Table table = tables.forTable(tableId);
242242
SourceTable sourceTable = getSourceTable(database, tableId.table());
243-
DDLOperation ddlOp;
243+
DDLOperation.Type ddlOp;
244244
if (alteredEvent.previousTableId() != null) {
245-
ddlOp = DDLOperation.RENAME_TABLE;
245+
ddlOp = DDLOperation.Type.RENAME_TABLE;
246246
builder.setPrevTable(alteredEvent.previousTableId().table());
247247
} else {
248-
ddlOp = DDLOperation.ALTER_TABLE;
248+
ddlOp = DDLOperation.Type.ALTER_TABLE;
249249
}
250250

251251
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
@@ -260,8 +260,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
260260
case DROP_TABLE:
261261
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
262262
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
263-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
264-
ddlEvent = builder.setOperation(DDLOperation.DROP_TABLE)
263+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.DROP_TABLE)) {
264+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_TABLE)
265265
.setTable(droppedEvent.tableId().table())
266266
.build();
267267
}
@@ -271,8 +271,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
271271
tableId = createdEvent.tableId();
272272
table = tables.forTable(tableId);
273273
sourceTable = getSourceTable(database, tableId.table());
274-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
275-
ddlEvent = builder.setOperation(DDLOperation.CREATE_TABLE)
274+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.CREATE_TABLE)) {
275+
ddlEvent = builder.setOperation(DDLOperation.Type.CREATE_TABLE)
276276
.setTable(tableId.table())
277277
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
278278
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
@@ -281,24 +281,24 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
281281
}
282282
break;
283283
case DROP_DATABASE:
284-
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
284+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
285285
break;
286286
case CREATE_DATABASE:
287287
// due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
288288
// a DROP_DATABASE event will be mistakenly categorized as a CREATE_DATABASE event.
289289
// TODO: check if this is fixed in a newer debezium version
290290
if (event.statement() != null && event.statement().startsWith("DROP DATABASE")) {
291-
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
291+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
292292
} else {
293-
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
293+
ddlEvent = builder.setOperation(DDLOperation.Type.CREATE_DATABASE).build();
294294
}
295295
break;
296296
case TRUNCATE_TABLE:
297297
DdlParserListener.TableTruncatedEvent truncatedEvent =
298298
(DdlParserListener.TableTruncatedEvent) event;
299299
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
300-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
301-
ddlEvent = builder.setOperation(DDLOperation.TRUNCATE_TABLE)
300+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.TRUNCATE_TABLE)) {
301+
ddlEvent = builder.setOperation(DDLOperation.Type.TRUNCATE_TABLE)
302302
.setTable(truncatedEvent.tableId().table())
303303
.build();
304304
}
@@ -318,12 +318,12 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
318318
}
319319
}
320320

321-
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
321+
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation.Type op) {
322322
return (!sourceTableNotValid(readAllTables, sourceTable)) &&
323323
(!isDDLOperationBlacklisted(readAllTables, sourceTable, op));
324324
}
325325

326-
private boolean isDDLOperationBlacklisted(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
326+
private boolean isDDLOperationBlacklisted(boolean readAllTables, SourceTable sourceTable, DDLOperation.Type op) {
327327
// return true if record consumer was not set to read all table events and the DDL op has been
328328
// blacklisted for this table
329329
return !readAllTables && sourceTable.getDdlBlacklist().contains(op);

mysql-delta-plugins/src/test/java/io/cdap/delta/mysql/MySqlEventReaderIntegrationTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,29 +148,29 @@ public void test() throws InterruptedException {
148148
Assert.assertEquals(2, eventEmitter.getDmlEvents().size());
149149

150150
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(0);
151-
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
151+
Assert.assertEquals(DDLOperation.Type.DROP_TABLE, ddlEvent.getOperation().getType());
152152
Assert.assertEquals(DB, ddlEvent.getDatabase());
153-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
153+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
154154

155155
ddlEvent = eventEmitter.getDdlEvents().get(1);
156-
Assert.assertEquals(DDLOperation.DROP_DATABASE, ddlEvent.getOperation());
156+
Assert.assertEquals(DDLOperation.Type.DROP_DATABASE, ddlEvent.getOperation().getType());
157157
Assert.assertEquals(DB, ddlEvent.getDatabase());
158158

159159
ddlEvent = eventEmitter.getDdlEvents().get(2);
160-
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
160+
Assert.assertEquals(DDLOperation.Type.CREATE_DATABASE, ddlEvent.getOperation().getType());
161161
Assert.assertEquals(DB, ddlEvent.getDatabase());
162162

163163
ddlEvent = eventEmitter.getDdlEvents().get(3);
164-
Assert.assertEquals(DDLOperation.CREATE_TABLE, ddlEvent.getOperation());
164+
Assert.assertEquals(DDLOperation.Type.CREATE_TABLE, ddlEvent.getOperation().getType());
165165
Assert.assertEquals(DB, ddlEvent.getDatabase());
166-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
166+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
167167
Assert.assertEquals(Collections.singletonList("id"), ddlEvent.getPrimaryKey());
168168
Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
169169

170170
DMLEvent dmlEvent = eventEmitter.getDmlEvents().get(0);
171-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
171+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
172172
Assert.assertEquals(DB, dmlEvent.getDatabase());
173-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
173+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
174174
StructuredRecord row = dmlEvent.getRow();
175175
StructuredRecord expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
176176
.set("id", 0)
@@ -180,9 +180,9 @@ public void test() throws InterruptedException {
180180
Assert.assertEquals(expected, row);
181181

182182
dmlEvent = eventEmitter.getDmlEvents().get(1);
183-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
183+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
184184
Assert.assertEquals(DB, dmlEvent.getDatabase());
185-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
185+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
186186
row = dmlEvent.getRow();
187187
expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
188188
.set("id", 1)

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ public void accept(SourceRecord sourceRecord) {
8080
Offset recordOffset = sqlServerOffset.getAsOffset();
8181

8282
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
83-
DMLOperation op;
83+
DMLOperation.Type op;
8484
String opStr = val.get("op");
8585
if ("c".equals(opStr) || "r".equals(opStr)) {
86-
op = DMLOperation.INSERT;
86+
op = DMLOperation.Type.INSERT;
8787
} else if ("u".equals(opStr)) {
88-
op = DMLOperation.UPDATE;
88+
op = DMLOperation.Type.UPDATE;
8989
} else if ("d".equals(opStr)) {
90-
op = DMLOperation.DELETE;
90+
op = DMLOperation.Type.DELETE;
9191
} else {
9292
LOG.warn("Skipping unknown operation type '{}'", opStr);
9393
return;
@@ -120,7 +120,7 @@ public void accept(SourceRecord sourceRecord) {
120120
after = Records.keepSelectedColumns(after, sourceTable.getColumns());
121121
}
122122
}
123-
StructuredRecord value = op == DMLOperation.DELETE ? before : after;
123+
StructuredRecord value = op == DMLOperation.Type.DELETE ? before : after;
124124

125125
if (value == null) {
126126
// this is a safety check to prevent npe warning, it should not be null
@@ -148,14 +148,14 @@ public void accept(SourceRecord sourceRecord) {
148148

149149
try {
150150
// try to always drop the table before snapshot the schema.
151-
emitter.emit(builder.setOperation(DDLOperation.DROP_TABLE)
151+
emitter.emit(builder.setOperation(DDLOperation.Type.DROP_TABLE)
152152
.setTable(tableName)
153153
.build());
154154

155155
// try to emit create database event before create table event
156-
emitter.emit(builder.setOperation(DDLOperation.CREATE_DATABASE).build());
156+
emitter.emit(builder.setOperation(DDLOperation.Type.CREATE_DATABASE).build());
157157

158-
emitter.emit(builder.setOperation(DDLOperation.CREATE_TABLE)
158+
emitter.emit(builder.setOperation(DDLOperation.Type.CREATE_TABLE)
159159
.setTable(tableName)
160160
.setSchema(schema)
161161
.setPrimaryKey(primaryFields)
@@ -175,7 +175,7 @@ public void accept(SourceRecord sourceRecord) {
175175
Long ingestTime = val.get("ts_ms");
176176
DMLEvent.Builder dmlBuilder = DMLEvent.builder()
177177
.setOffset(recordOffset)
178-
.setOperation(op)
178+
.setOperationType(op)
179179
.setDatabase(databaseName)
180180
.setTable(tableName)
181181
.setRow(value)
@@ -184,7 +184,7 @@ public void accept(SourceRecord sourceRecord) {
184184
.setIngestTimestamp(ingestTime == null ? 0L : ingestTime);
185185

186186
// It is required for the source to provide the previous row if the operation is 'UPDATE'
187-
if (op == DMLOperation.UPDATE) {
187+
if (op == DMLOperation.Type.UPDATE) {
188188
dmlBuilder.setPreviousRow(before);
189189
}
190190

sqlserver-delta-plugins/src/test/java/io.cdap.delta.sqlserver/SqlServerEventReaderIntegrationTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,25 +156,25 @@ public void test() throws InterruptedException {
156156
Assert.assertEquals(2, eventEmitter.getDmlEvents().size());
157157

158158
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(0);
159-
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
159+
Assert.assertEquals(DDLOperation.Type.DROP_TABLE, ddlEvent.getOperation().getType());
160160
Assert.assertEquals(DB, ddlEvent.getDatabase());
161-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
161+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
162162

163163
ddlEvent = eventEmitter.getDdlEvents().get(1);
164-
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
164+
Assert.assertEquals(DDLOperation.Type.CREATE_DATABASE, ddlEvent.getOperation().getType());
165165
Assert.assertEquals(DB, ddlEvent.getDatabase());
166166

167167
ddlEvent = eventEmitter.getDdlEvents().get(2);
168-
Assert.assertEquals(DDLOperation.CREATE_TABLE, ddlEvent.getOperation());
168+
Assert.assertEquals(DDLOperation.Type.CREATE_TABLE, ddlEvent.getOperation().getType());
169169
Assert.assertEquals(DB, ddlEvent.getDatabase());
170-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
170+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
171171
Assert.assertEquals(Collections.singletonList("id"), ddlEvent.getPrimaryKey());
172172
Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
173173

174174
DMLEvent dmlEvent = eventEmitter.getDmlEvents().get(0);
175-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
175+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
176176
Assert.assertEquals(DB, dmlEvent.getDatabase());
177-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
177+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
178178
StructuredRecord row = dmlEvent.getRow();
179179
StructuredRecord expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
180180
.set("id", 0)
@@ -184,9 +184,9 @@ public void test() throws InterruptedException {
184184
Assert.assertEquals(expected, row);
185185

186186
dmlEvent = eventEmitter.getDmlEvents().get(1);
187-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
187+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
188188
Assert.assertEquals(DB, dmlEvent.getDatabase());
189-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
189+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
190190
row = dmlEvent.getRow();
191191
expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
192192
.set("id", 1)

0 commit comments

Comments
 (0)