Skip to content

Commit 042e721

Browse files
authored
Merge pull request #97 from data-integrations/feature/emit-metric
CDAP-17371 Update replicator sources to call updated version of the metrics API.
2 parents b6a00cf + 9dfd55d commit 042e721

5 files changed

Lines changed: 61 additions & 50 deletions

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/mock/MockContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.cdap.delta.api.DeltaSourceContext;
2323
import io.cdap.delta.api.ReplicationError;
2424

25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.Map;
2728
import javax.annotation.Nullable;
@@ -69,6 +70,16 @@ public void count(String metricName, int delta) {
6970
public void gauge(String metricName, long value) {
7071
// no-op
7172
}
73+
74+
@Override
75+
public Metrics child(Map<String, String> tags) {
76+
return this;
77+
}
78+
79+
@Override
80+
public Map<String, String> getTags() {
81+
return Collections.emptyMap();
82+
}
7283
};
7384
}
7485

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
159159
return;
160160
}
161161

162-
DMLOperation op;
162+
DMLOperation.Type op;
163163
String opStr = val.get("op");
164164
if ("c".equals(opStr)) {
165-
op = DMLOperation.INSERT;
165+
op = DMLOperation.Type.INSERT;
166166
} else if ("u".equals(opStr)) {
167-
op = DMLOperation.UPDATE;
167+
op = DMLOperation.Type.UPDATE;
168168
} else if ("d".equals(opStr)) {
169-
op = DMLOperation.DELETE;
169+
op = DMLOperation.Type.DELETE;
170170
} else {
171171
LOG.warn("Skipping unknown operation type '{}'", opStr);
172172
return;
@@ -199,17 +199,17 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
199199
Long ingestTime = val.get("ts_ms");
200200
DMLEvent.Builder builder = DMLEvent.builder()
201201
.setOffset(recordOffset)
202-
.setOperation(op)
202+
.setOperationType(op)
203203
.setDatabase(databaseName)
204204
.setTable(tableName)
205205
.setTransactionId(transactionId)
206206
.setIngestTimestamp(ingestTime)
207207
.setSnapshot(isSnapshot);
208208

209209
// It is required for the source to provide the previous row if the dml operation is 'UPDATE'
210-
if (op == DMLOperation.UPDATE) {
210+
if (op == DMLOperation.Type.UPDATE) {
211211
emitter.emit(builder.setPreviousRow(before).setRow(after).build());
212-
} else if (op == DMLOperation.DELETE) {
212+
} else if (op == DMLOperation.Type.DELETE) {
213213
emitter.emit(builder.setRow(before).build());
214214
} else {
215215
emitter.emit(builder.setRow(after).build());
@@ -240,12 +240,12 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
240240
TableId tableId = alteredEvent.tableId();
241241
Table table = tables.forTable(tableId);
242242
SourceTable sourceTable = getSourceTable(database, tableId.table());
243-
DDLOperation ddlOp;
243+
DDLOperation.Type ddlOp;
244244
if (alteredEvent.previousTableId() != null) {
245-
ddlOp = DDLOperation.RENAME_TABLE;
245+
ddlOp = DDLOperation.Type.RENAME_TABLE;
246246
builder.setPrevTable(alteredEvent.previousTableId().table());
247247
} else {
248-
ddlOp = DDLOperation.ALTER_TABLE;
248+
ddlOp = DDLOperation.Type.ALTER_TABLE;
249249
}
250250

251251
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, ddlOp)) {
@@ -260,8 +260,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
260260
case DROP_TABLE:
261261
DdlParserListener.TableDroppedEvent droppedEvent = (DdlParserListener.TableDroppedEvent) event;
262262
sourceTable = getSourceTable(database, droppedEvent.tableId().table());
263-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.DROP_TABLE)) {
264-
ddlEvent = builder.setOperation(DDLOperation.DROP_TABLE)
263+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.DROP_TABLE)) {
264+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_TABLE)
265265
.setTable(droppedEvent.tableId().table())
266266
.build();
267267
}
@@ -271,8 +271,8 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
271271
tableId = createdEvent.tableId();
272272
table = tables.forTable(tableId);
273273
sourceTable = getSourceTable(database, tableId.table());
274-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.CREATE_TABLE)) {
275-
ddlEvent = builder.setOperation(DDLOperation.CREATE_TABLE)
274+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.CREATE_TABLE)) {
275+
ddlEvent = builder.setOperation(DDLOperation.Type.CREATE_TABLE)
276276
.setTable(tableId.table())
277277
.setSchema(readAllTables ? Records.getSchema(table, mySqlValueConverters) :
278278
Records.getSchema(table, mySqlValueConverters, sourceTable.getColumns()))
@@ -281,24 +281,24 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
281281
}
282282
break;
283283
case DROP_DATABASE:
284-
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
284+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
285285
break;
286286
case CREATE_DATABASE:
287287
// due to a bug in io.debezium.relational.ddl.AbstractDdlParser#signalDropDatabase
288288
// a DROP_DATABASE event will be mistakenly categorized as a CREATE_DATABASE event.
289289
// TODO: check if this is fixed in a newer debezium version
290290
if (event.statement() != null && event.statement().startsWith("DROP DATABASE")) {
291-
ddlEvent = builder.setOperation(DDLOperation.DROP_DATABASE).build();
291+
ddlEvent = builder.setOperation(DDLOperation.Type.DROP_DATABASE).build();
292292
} else {
293-
ddlEvent = builder.setOperation(DDLOperation.CREATE_DATABASE).build();
293+
ddlEvent = builder.setOperation(DDLOperation.Type.CREATE_DATABASE).build();
294294
}
295295
break;
296296
case TRUNCATE_TABLE:
297297
DdlParserListener.TableTruncatedEvent truncatedEvent =
298298
(DdlParserListener.TableTruncatedEvent) event;
299299
sourceTable = getSourceTable(database, truncatedEvent.tableId().table());
300-
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.TRUNCATE_TABLE)) {
301-
ddlEvent = builder.setOperation(DDLOperation.TRUNCATE_TABLE)
300+
if (shouldEmitDdlEventForOperation(readAllTables, sourceTable, DDLOperation.Type.TRUNCATE_TABLE)) {
301+
ddlEvent = builder.setOperation(DDLOperation.Type.TRUNCATE_TABLE)
302302
.setTable(truncatedEvent.tableId().table())
303303
.build();
304304
}
@@ -318,12 +318,12 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
318318
}
319319
}
320320

321-
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
321+
private boolean shouldEmitDdlEventForOperation(boolean readAllTables, SourceTable sourceTable, DDLOperation.Type op) {
322322
return (!sourceTableNotValid(readAllTables, sourceTable)) &&
323323
(!isDDLOperationBlacklisted(readAllTables, sourceTable, op));
324324
}
325325

326-
private boolean isDDLOperationBlacklisted(boolean readAllTables, SourceTable sourceTable, DDLOperation op) {
326+
private boolean isDDLOperationBlacklisted(boolean readAllTables, SourceTable sourceTable, DDLOperation.Type op) {
327327
// return true if record consumer was not set to read all table events and the DDL op has been
328328
// blacklisted for this table
329329
return !readAllTables && sourceTable.getDdlBlacklist().contains(op);

mysql-delta-plugins/src/test/java/io/cdap/delta/mysql/MySqlEventReaderIntegrationTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -153,29 +153,29 @@ public void test() throws InterruptedException {
153153
Assert.assertEquals(2, eventEmitter.getDmlEvents().size());
154154

155155
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(0);
156-
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
156+
Assert.assertEquals(DDLOperation.Type.DROP_TABLE, ddlEvent.getOperation().getType());
157157
Assert.assertEquals(DB, ddlEvent.getDatabase());
158-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
158+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
159159

160160
ddlEvent = eventEmitter.getDdlEvents().get(1);
161-
Assert.assertEquals(DDLOperation.DROP_DATABASE, ddlEvent.getOperation());
161+
Assert.assertEquals(DDLOperation.Type.DROP_DATABASE, ddlEvent.getOperation().getType());
162162
Assert.assertEquals(DB, ddlEvent.getDatabase());
163163

164164
ddlEvent = eventEmitter.getDdlEvents().get(2);
165-
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
165+
Assert.assertEquals(DDLOperation.Type.CREATE_DATABASE, ddlEvent.getOperation().getType());
166166
Assert.assertEquals(DB, ddlEvent.getDatabase());
167167

168168
ddlEvent = eventEmitter.getDdlEvents().get(3);
169-
Assert.assertEquals(DDLOperation.CREATE_TABLE, ddlEvent.getOperation());
169+
Assert.assertEquals(DDLOperation.Type.CREATE_TABLE, ddlEvent.getOperation().getType());
170170
Assert.assertEquals(DB, ddlEvent.getDatabase());
171-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
171+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
172172
Assert.assertEquals(Collections.singletonList("id"), ddlEvent.getPrimaryKey());
173173
Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
174174

175175
DMLEvent dmlEvent = eventEmitter.getDmlEvents().get(0);
176-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
176+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
177177
Assert.assertEquals(DB, dmlEvent.getDatabase());
178-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
178+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
179179
StructuredRecord row = dmlEvent.getRow();
180180
StructuredRecord expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
181181
.set("id", 0)
@@ -185,9 +185,9 @@ public void test() throws InterruptedException {
185185
Assert.assertEquals(expected, row);
186186

187187
dmlEvent = eventEmitter.getDmlEvents().get(1);
188-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
188+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
189189
Assert.assertEquals(DB, dmlEvent.getDatabase());
190-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
190+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
191191
row = dmlEvent.getRow();
192192
expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
193193
.set("id", 1)

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ public void accept(SourceRecord sourceRecord) {
8080
Offset recordOffset = sqlServerOffset.getAsOffset();
8181

8282
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
83-
DMLOperation op;
83+
DMLOperation.Type op;
8484
String opStr = val.get("op");
8585
if ("c".equals(opStr) || "r".equals(opStr)) {
86-
op = DMLOperation.INSERT;
86+
op = DMLOperation.Type.INSERT;
8787
} else if ("u".equals(opStr)) {
88-
op = DMLOperation.UPDATE;
88+
op = DMLOperation.Type.UPDATE;
8989
} else if ("d".equals(opStr)) {
90-
op = DMLOperation.DELETE;
90+
op = DMLOperation.Type.DELETE;
9191
} else {
9292
LOG.warn("Skipping unknown operation type '{}'", opStr);
9393
return;
@@ -120,7 +120,7 @@ public void accept(SourceRecord sourceRecord) {
120120
after = Records.keepSelectedColumns(after, sourceTable.getColumns());
121121
}
122122
}
123-
StructuredRecord value = op == DMLOperation.DELETE ? before : after;
123+
StructuredRecord value = op == DMLOperation.Type.DELETE ? before : after;
124124

125125
if (value == null) {
126126
// this is a safety check to prevent npe warning, it should not be null
@@ -148,14 +148,14 @@ public void accept(SourceRecord sourceRecord) {
148148

149149
try {
150150
// try to always drop the table before snapshot the schema.
151-
emitter.emit(builder.setOperation(DDLOperation.DROP_TABLE)
151+
emitter.emit(builder.setOperation(DDLOperation.Type.DROP_TABLE)
152152
.setTable(tableName)
153153
.build());
154154

155155
// try to emit create database event before create table event
156-
emitter.emit(builder.setOperation(DDLOperation.CREATE_DATABASE).build());
156+
emitter.emit(builder.setOperation(DDLOperation.Type.CREATE_DATABASE).build());
157157

158-
emitter.emit(builder.setOperation(DDLOperation.CREATE_TABLE)
158+
emitter.emit(builder.setOperation(DDLOperation.Type.CREATE_TABLE)
159159
.setTable(tableName)
160160
.setSchema(schema)
161161
.setPrimaryKey(primaryFields)
@@ -175,7 +175,7 @@ public void accept(SourceRecord sourceRecord) {
175175
Long ingestTime = val.get("ts_ms");
176176
DMLEvent.Builder dmlBuilder = DMLEvent.builder()
177177
.setOffset(recordOffset)
178-
.setOperation(op)
178+
.setOperationType(op)
179179
.setDatabase(databaseName)
180180
.setTable(tableName)
181181
.setRow(value)
@@ -184,7 +184,7 @@ public void accept(SourceRecord sourceRecord) {
184184
.setIngestTimestamp(ingestTime == null ? 0L : ingestTime);
185185

186186
// It is required for the source to provide the previous row if the operation is 'UPDATE'
187-
if (op == DMLOperation.UPDATE) {
187+
if (op == DMLOperation.Type.UPDATE) {
188188
dmlBuilder.setPreviousRow(before);
189189
}
190190

sqlserver-delta-plugins/src/test/java/io.cdap.delta.sqlserver/SqlServerEventReaderIntegrationTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,25 +161,25 @@ public void test() throws InterruptedException {
161161
Assert.assertEquals(2, eventEmitter.getDmlEvents().size());
162162

163163
DDLEvent ddlEvent = eventEmitter.getDdlEvents().get(0);
164-
Assert.assertEquals(DDLOperation.DROP_TABLE, ddlEvent.getOperation());
164+
Assert.assertEquals(DDLOperation.Type.DROP_TABLE, ddlEvent.getOperation().getType());
165165
Assert.assertEquals(DB, ddlEvent.getDatabase());
166-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
166+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
167167

168168
ddlEvent = eventEmitter.getDdlEvents().get(1);
169-
Assert.assertEquals(DDLOperation.CREATE_DATABASE, ddlEvent.getOperation());
169+
Assert.assertEquals(DDLOperation.Type.CREATE_DATABASE, ddlEvent.getOperation().getType());
170170
Assert.assertEquals(DB, ddlEvent.getDatabase());
171171

172172
ddlEvent = eventEmitter.getDdlEvents().get(2);
173-
Assert.assertEquals(DDLOperation.CREATE_TABLE, ddlEvent.getOperation());
173+
Assert.assertEquals(DDLOperation.Type.CREATE_TABLE, ddlEvent.getOperation().getType());
174174
Assert.assertEquals(DB, ddlEvent.getDatabase());
175-
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getTable());
175+
Assert.assertEquals(CUSTOMERS_TABLE, ddlEvent.getOperation().getTableName());
176176
Assert.assertEquals(Collections.singletonList("id"), ddlEvent.getPrimaryKey());
177177
Assert.assertEquals(CUSTOMERS_SCHEMA, ddlEvent.getSchema());
178178

179179
DMLEvent dmlEvent = eventEmitter.getDmlEvents().get(0);
180-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
180+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
181181
Assert.assertEquals(DB, dmlEvent.getDatabase());
182-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
182+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
183183
StructuredRecord row = dmlEvent.getRow();
184184
StructuredRecord expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
185185
.set("id", 0)
@@ -189,9 +189,9 @@ public void test() throws InterruptedException {
189189
Assert.assertEquals(expected, row);
190190

191191
dmlEvent = eventEmitter.getDmlEvents().get(1);
192-
Assert.assertEquals(DMLOperation.INSERT, dmlEvent.getOperation());
192+
Assert.assertEquals(DMLOperation.Type.INSERT, dmlEvent.getOperation().getType());
193193
Assert.assertEquals(DB, dmlEvent.getDatabase());
194-
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getTable());
194+
Assert.assertEquals(CUSTOMERS_TABLE, dmlEvent.getOperation().getTableName());
195195
row = dmlEvent.getRow();
196196
expected = StructuredRecord.builder(CUSTOMERS_SCHEMA)
197197
.set("id", 1)

0 commit comments

Comments
 (0)