Skip to content

Commit e6fcdd7

Browse files
authored
[Java][Debezium] Fix NPE in debeziumRecordInstant for DELETE events (#37795)
* Fix #37738: handle Debezium DELETE records without valueSchema * refactor: replace fully qualified class names with imports in KafkaConnectSchemaTest.
1 parent 7d756c2 commit e6fcdd7

2 files changed

Lines changed: 58 additions & 10 deletions

File tree

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,26 @@ public static Schema.FieldType beamSchemaTypeFromKafkaType(
8080
}
8181

8282
public static Instant debeziumRecordInstant(SourceRecord record) {
83-
if (!record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
84-
|| record.valueSchema().field("ts_ms") == null) {
85-
throw new IllegalArgumentException(
86-
"Debezium record received is not of the right kind. "
87-
+ String.format(
88-
"Should be STRUCT with ts_ms field. Instead it is: %s", record.valueSchema()));
83+
if (record.valueSchema() != null
84+
&& record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
85+
&& record.valueSchema().field("ts_ms") != null
86+
&& record.value() != null) {
87+
Struct recordValue = (Struct) record.value();
88+
return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
8989
}
90-
Struct recordValue = (Struct) record.value();
91-
return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
90+
91+
if (record.sourceOffset() != null && record.sourceOffset().containsKey("ts_usec")) {
92+
Object tsUsecValue = record.sourceOffset().get("ts_usec");
93+
if (tsUsecValue instanceof Number) {
94+
return Instant.ofEpochMilli(((Number) tsUsecValue).longValue() / 1000);
95+
}
96+
}
97+
98+
throw new IllegalArgumentException(
99+
"Debezium record received is not of the right kind. "
100+
+ String.format(
101+
"Should be STRUCT with ts_ms field or sourceOffset with ts_usec. Instead it is: %s, %s",
102+
record.valueSchema(), record.sourceOffset()));
92103
}
93104

94105
public static SourceRecordMapper<Row> beamRowFromSourceRecordFn(final Schema recordSchema) {

sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import static org.hamcrest.MatcherAssert.assertThat;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.Collections;
2324
import org.apache.beam.sdk.schemas.Schema;
25+
import org.apache.kafka.connect.source.SourceRecord;
2426
import org.hamcrest.Matchers;
27+
import org.joda.time.Instant;
2528
import org.junit.Test;
2629
import org.junit.runner.RunWith;
2730
import org.junit.runners.JUnit4;
@@ -59,11 +62,45 @@ public void testSimpleSourceRecordSchemaConversion() {
5962

6063
@Test
6164
public void testTimestampRequired() {
62-
org.apache.kafka.connect.source.SourceRecord record = SourceRecordJsonTest.buildSourceRecord();
65+
SourceRecord record = SourceRecordJsonTest.buildSourceRecord();
6366

6467
IllegalArgumentException e =
6568
assertThrows(
6669
IllegalArgumentException.class, () -> KafkaConnectUtils.debeziumRecordInstant(record));
67-
assertThat(e.getMessage(), Matchers.containsString("Should be STRUCT with ts_ms field"));
70+
assertThat(
71+
e.getMessage(),
72+
Matchers.containsString("Should be STRUCT with ts_ms field or sourceOffset with ts_usec"));
73+
}
74+
75+
@Test
76+
public void testDebeziumRecordInstantNullValueSchema() {
77+
SourceRecord record =
78+
new SourceRecord(
79+
Collections.singletonMap("server", "test"),
80+
Collections.singletonMap("ts_usec", 1614854400000000L),
81+
"test-topic",
82+
null,
83+
null);
84+
85+
Instant instant = KafkaConnectUtils.debeziumRecordInstant(record);
86+
assertThat(instant.getMillis(), Matchers.is(1614854400000L));
87+
}
88+
89+
@Test
90+
public void testDebeziumRecordInstantMissingTimestamp() {
91+
SourceRecord record =
92+
new SourceRecord(
93+
Collections.singletonMap("server", "test"),
94+
Collections.emptyMap(),
95+
"test-topic",
96+
null,
97+
null);
98+
99+
IllegalArgumentException e =
100+
assertThrows(
101+
IllegalArgumentException.class, () -> KafkaConnectUtils.debeziumRecordInstant(record));
102+
assertThat(
103+
e.getMessage(),
104+
Matchers.containsString("Should be STRUCT with ts_ms field or sourceOffset with ts_usec"));
68105
}
69106
}

0 commit comments

Comments
 (0)