Skip to content

Commit f6d1dff

Browse files
jogroganclaude
andauthored
Improve YAML parsing robustness and handle null Avro types (#202)
* Wrap Dynamics.newFromYaml() to handle malformed YAML snakeyaml throws IndexOutOfBoundsException when parsing malformed YAML via Dynamics.newFromYaml(). The call in estimateElementStatus() was outside the existing try/catch, causing the exception to propagate up through the JDBC layer and crash getPipelineStatus requests. - Wrap Dynamics.newFromYaml() in estimateElementStatus() with try/catch; return a default unready status on parse failure instead of propagating - Same fix in getElementConfiguration() which had the identical pattern - Add tests for both: malformed YAML returns unready status / empty map Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: Handle SnakeYAML parsing exceptions in operator SnakeYAML throws ScannerException, ParserException, and ConstructorException when parsing malformed YAML in Kubernetes resource specs or annotations. Two call sites were unprotected: 1. Operator.isReady(String yaml) and Operator.isFailed(String yaml): the Dynamics.newFromYaml() call was outside the existing try/catch, so any parse error would propagate up and crash the SubscriptionReconciler (stack trace through SubscriptionReconciler.reconcile line 171 -> Operator.isReady line 180). 2. SubscriptionReconciler.fetchAttributes(String yaml): same unprotected Dynamics.newFromYaml() call, returns empty map on parse failure instead. The K8sPipelineElementStatusEstimator call site was already fixed in a prior commit; this commit adds additional tests covering ScannerException, ParserException, and ConstructorException for completeness. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: Guard against NullPointerException in K8sPipelineElementStatusEstimator When Dynamics.newFromYaml() parses YAML without a metadata field, the resulting DynamicKubernetesObject has null metadata. Calling obj.getMetadata().getName() on it caused NPE in estimateElementStatus(). Guard against null metadata by returning an unready status early, consistent with how we handle other parse/retrieval failures. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: handle null Avro schemas in AvroConverter to prevent NPE Add null guard at entry to AvroConverter.rel() and explicit NULL type handling. When a null schema or NULL-typed schema is encountered, returns a proper SQL NULL type instead of throwing NPE. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: downgrade YAML exception logs ERROR→WARN, add null-safety guards - K8sPipelineElementStatusEstimator: downgrade YAML parse errors from ERROR to WARN — exceptions are caught and handled correctly, ERROR level was causing spurious automated tickets - Operator.isReady/isFailed: guard against null metadata and null namespace in K8s resource - SubscriptionReconciler.fetchAttributes: guard against null metadata; downgrade SQL validation errors to WARN level Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Clean up comments & tests * Build fixes - forgot to commit --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 3842f9d commit f6d1dff

10 files changed

Lines changed: 397 additions & 8 deletions

File tree

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,13 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
163163
* TODO: default field values are lost when converting from Avro to RelDataType
164164
*/
165165
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory, boolean nullable) {
166+
if (schema == null) {
167+
return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.NULL), true);
168+
}
166169
RelDataType unknown = typeFactory.createUnknownType();
167170
switch (schema.getType()) {
171+
case NULL:
172+
return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.NULL), true);
168173
case RECORD:
169174
return typeFactory.createTypeWithNullability(typeFactory.createStructType(schema.getFields().stream()
170175
.map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory, nullable)))

hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,55 @@ public void convertsNestedArray() {
217217
assertEquals(Schema.Type.STRING, innermostField.getTypes().get(1).getType());
218218
}
219219

220+
@Test
221+
public void convertsNullableUnionFields() {
222+
// Schema with a nullable string field: ["null", "string"]
223+
String schemaString =
224+
"{\"type\":\"record\",\"name\":\"R\",\"namespace\":\"ns\",\"fields\":["
225+
+ "{\"name\":\"nullableStr\",\"type\":[\"null\",\"string\"]},"
226+
+ "{\"name\":\"nullableInt\",\"type\":[\"null\",\"int\"]},"
227+
+ "{\"name\":\"requiredStr\",\"type\":\"string\"}]}";
228+
Schema avroSchema = (new Schema.Parser()).parse(schemaString);
229+
RelDataType rel = AvroConverter.rel(avroSchema);
230+
assertNotNull(rel);
231+
assertEquals(3, rel.getFieldCount());
232+
assertNotNull(rel.getField("nullableStr", false, false));
233+
assertTrue(Objects.requireNonNull(rel.getField("nullableStr", false, false)).getType().isNullable());
234+
assertNotNull(rel.getField("nullableInt", false, false));
235+
assertTrue(Objects.requireNonNull(rel.getField("nullableInt", false, false)).getType().isNullable());
236+
assertNotNull(rel.getField("requiredStr", false, false));
237+
}
238+
239+
@Test
240+
public void convertsNullTypeField() {
241+
// Schema with a field whose type is just "null"
242+
String schemaString =
243+
"{\"type\":\"record\",\"name\":\"R\",\"namespace\":\"ns\",\"fields\":["
244+
+ "{\"name\":\"nullField\",\"type\":\"null\"},"
245+
+ "{\"name\":\"strField\",\"type\":\"string\"}]}";
246+
Schema avroSchema = (new Schema.Parser()).parse(schemaString);
247+
RelDataType rel = AvroConverter.rel(avroSchema);
248+
assertNotNull(rel);
249+
// The null-typed field should be filtered out (existing behavior) or handled gracefully
250+
assertNotNull(rel.getField("strField", false, false));
251+
}
252+
253+
@Test
254+
public void convertsNullSchema() {
255+
// A standalone NULL schema should not throw NPE
256+
Schema nullSchema = Schema.create(Schema.Type.NULL);
257+
RelDataType rel = AvroConverter.rel(nullSchema);
258+
assertNotNull(rel);
259+
}
260+
261+
@Test
262+
public void handlesNullSchemaParameter() {
263+
// Passing null schema should not throw NPE - should handle gracefully
264+
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
265+
RelDataType rel = AvroConverter.rel(null, typeFactory);
266+
assertNotNull(rel);
267+
}
268+
220269
@Test
221270
public void handlesNamespaceInNestedArrayAndMapElements() {
222271
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineElementApi.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ List<String> getPipelineElements(V1alpha1Pipeline pipeline) {
6969
* Returns spec.configs of the given element, if present.
7070
*/
7171
Map<String, String> getElementConfiguration(String elementYaml, String pipelineNamespace) {
72-
DynamicKubernetesObject obj = Dynamics.newFromYaml(elementYaml);
72+
DynamicKubernetesObject obj;
73+
try {
74+
obj = Dynamics.newFromYaml(elementYaml);
75+
} catch (Exception e) {
76+
log.error("Failed to parse element YAML for configuration extraction: {}", e.getMessage(), e);
77+
return new HashMap<>();
78+
}
7379
String name = obj.getMetadata().getName();
7480
String namespace = obj.getMetadata().getNamespace() == null ? pipelineNamespace : obj.getMetadata().getNamespace();
7581
String kind = obj.getKind();

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/status/K8sPipelineElementStatusEstimator.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,19 @@ public List<K8sPipelineElementStatus> estimateStatuses(V1alpha1Pipeline pipeline
4848
* Estimates status of an element. If we can not retrieve it from K8s, we assume that it's not ready and not failed yet.
4949
*/
5050
public K8sPipelineElementStatus estimateElementStatus(String elementYaml, String pipelineNamespace) {
51-
DynamicKubernetesObject obj = Dynamics.newFromYaml(elementYaml);
51+
DynamicKubernetesObject obj;
52+
try {
53+
obj = Dynamics.newFromYaml(elementYaml);
54+
} catch (Exception e) {
55+
String message = String.format("Failed to parse element YAML: %s", e.getMessage());
56+
log.warn(message, e);
57+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure("unknown", message);
58+
}
59+
if (obj.getMetadata() == null) {
60+
String message = "Failed to parse element YAML: null metadata";
61+
log.warn(message);
62+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure("unknown", message);
63+
}
5264
String name = obj.getMetadata().getName();
5365
String namespace = obj.getMetadata().getNamespace() == null ? pipelineNamespace : obj.getMetadata().getNamespace();
5466
String kind = obj.getKind();

hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sPipelineElementApiTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,15 @@ void testGetElementConfigurationNullElement() {
250250
assertTrue(result.isEmpty());
251251
}
252252

253+
@Test
254+
void testGetElementConfigurationMalformedYaml() {
255+
// snakeyaml throws IndexOutOfBoundsException for malformed YAML.
256+
// getElementConfiguration() must catch this and return an empty map rather than propagating.
257+
String malformedYaml = "malformed: yaml: [unclosed";
258+
Map<String, String> result = api.getElementConfiguration(malformedYaml, "default");
259+
assertTrue(result.isEmpty());
260+
}
261+
253262
@Test
254263
void testGetElementConfigurationExceptionThrown() {
255264
// Given
@@ -300,4 +309,4 @@ private void setupMockForElementConfiguration(JsonObject rootJson, boolean succe
300309
when(mockDynamicApi.get(anyString(), anyString())).thenReturn(mockApiResponse);
301310
when(mockContext.dynamic(anyString(), anyString())).thenReturn(mockDynamicApi);
302311
}
303-
}
312+
}

hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/status/K8sPipelineElementStatusEstimatorTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public class K8sPipelineElementStatusEstimatorTest {
8181
@BeforeEach
8282
void setUp() {
8383
estimator = new K8sPipelineElementStatusEstimator(context);
84+
}
85+
86+
private void setUpPipelineMocks() {
8487
when(pipelineMetadata.getNamespace()).thenReturn("fake-namespace");
8588
when(pipeline.getMetadata()).thenReturn(pipelineMetadata);
8689
when(pipeline.getSpec()).thenReturn(pipelineSpec);
@@ -92,6 +95,7 @@ void setUp() {
9295

9396
@Test
9497
void testEstimateWhenPipelineHasMultipleElements() {
98+
setUpPipelineMocks();
9599
when(pipelineSpec.getYaml()).thenReturn(FAKE_MULTIPLE_SPECS);
96100
// Set up: failed to get kafka object from K8s.
97101
when(kafkaDynamicKubernetesApiResponse.isSuccess()).thenReturn(false);
@@ -121,6 +125,7 @@ void testEstimateWhenPipelineHasMultipleElements() {
121125

122126
@Test
123127
void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingStatusFieldWithReadyInfo() {
128+
setUpPipelineMocks();
124129
mockJobDynamicObjectWithStatusField();
125130
JsonElement readyElement = mock(JsonElement.class);
126131
when(readyElement.getAsBoolean()).thenReturn(true);
@@ -152,6 +157,7 @@ private void mockJobDynamicObjectWithStatusField() {
152157
@FieldSource("READY_STRINGS")
153158
@FieldSource("FAILED_STRINGS")
154159
void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingStatusFieldWithStateInfo(String state) {
160+
setUpPipelineMocks();
155161
mockJobDynamicObjectWithStatusField();
156162
JsonElement stateElement = mock(JsonElement.class);
157163
when(stateElement.getAsString()).thenReturn(state);
@@ -169,6 +175,7 @@ void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingStatusFieldWithS
169175
@FieldSource("READY_STRINGS")
170176
@FieldSource("FAILED_STRINGS")
171177
void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingStatusFieldWithJobStatusStateInfo(String state) {
178+
setUpPipelineMocks();
172179
mockJobDynamicObjectWithStatusField();
173180
JsonElement stateElement = mock(JsonElement.class);
174181
when(stateElement.getAsString()).thenReturn(state);
@@ -189,6 +196,7 @@ void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingStatusFieldWithJ
189196

190197
@Test
191198
void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingNoStatusField() {
199+
setUpPipelineMocks();
192200
when(jobDynamicObjectRawJsonObject.has("status")).thenReturn(false);
193201
mockDynamicObjectWithMetadata();
194202

@@ -211,6 +219,7 @@ private void mockDynamicObjectWithMetadata() {
211219

212220
@Test
213221
void testEstimateWhenCallToK8sFails() {
222+
setUpPipelineMocks();
214223
when(jobDynamicKubernetesApiResponse.isSuccess()).thenReturn(false);
215224
List<K8sPipelineElementStatus> statuses = estimator.estimateStatuses(pipeline);
216225
K8sPipelineElementStatus status = Iterables.getOnlyElement(statuses);
@@ -221,6 +230,7 @@ void testEstimateWhenCallToK8sFails() {
221230

222231
@Test
223232
void testEstimateWhenK8sReturnsNullObject() {
233+
setUpPipelineMocks();
224234
when(jobDynamicKubernetesApiResponse.getObject()).thenReturn(null);
225235
List<K8sPipelineElementStatus> statuses = estimator.estimateStatuses(pipeline);
226236
K8sPipelineElementStatus status = Iterables.getOnlyElement(statuses);
@@ -229,8 +239,63 @@ void testEstimateWhenK8sReturnsNullObject() {
229239
assertEquals(status.getMessage(), "Returned K8s object is null or has no json");
230240
}
231241

242+
@Test
243+
void testEstimateWhenElementYamlIsMalformed() {
244+
// estimateElementStatus() must catch snakeyaml exceptions and return an unready status rather
245+
// than propagating. Call estimateElementStatus() directly to avoid triggering unused @BeforeEach stubs.
246+
K8sPipelineElementStatus status = estimator.estimateElementStatus("malformed: yaml: [unclosed", "test-namespace");
247+
assertFalse(status.isReady());
248+
assertFalse(status.isFailed());
249+
assertTrue(status.getMessage().contains("Failed to parse element YAML"));
250+
}
251+
252+
@Test
253+
void testEstimateWhenElementYamlTriggersScannerException() {
254+
// snakeyaml throws ScannerException for YAML with invalid characters or mapping structure
255+
// (e.g. a colon in an unquoted value). estimateElementStatus() must catch it and return an unready status.
256+
K8sPipelineElementStatus status =
257+
estimator.estimateElementStatus("key: value: with: colons: everywhere:", "test-namespace");
258+
assertFalse(status.isReady());
259+
assertFalse(status.isFailed());
260+
assertTrue(status.getMessage().contains("Failed to parse element YAML"));
261+
}
262+
263+
@Test
264+
void testEstimateWhenElementYamlTriggersParserException() {
265+
// snakeyaml throws ParserException for YAML with invalid document structure
266+
// (e.g. duplicate document markers). estimateElementStatus() must catch it and return an unready status.
267+
K8sPipelineElementStatus status =
268+
estimator.estimateElementStatus("--- invalid\n--- also invalid\n---", "test-namespace");
269+
assertFalse(status.isReady());
270+
assertFalse(status.isFailed());
271+
assertTrue(status.getMessage().contains("Failed to parse element YAML"));
272+
}
273+
274+
@Test
275+
void testEstimateWhenElementYamlTriggersConstructorException() {
276+
// snakeyaml throws ConstructorException for YAML with type tags that cannot be constructed.
277+
// estimateElementStatus() must catch it and return an unready status.
278+
K8sPipelineElementStatus status =
279+
estimator.estimateElementStatus("!!java.util.Date 2021-01-01", "test-namespace");
280+
assertFalse(status.isReady());
281+
assertFalse(status.isFailed());
282+
assertTrue(status.getMessage().contains("Failed to parse element YAML"));
283+
}
284+
285+
@Test
286+
void testEstimateWhenElementYamlHasNullMetadata() {
287+
// A YAML with no 'metadata' field produces a DynamicKubernetesObject with null metadata.
288+
// estimateElementStatus() must guard against this and return an unready status.
289+
K8sPipelineElementStatus status =
290+
estimator.estimateElementStatus("apiVersion: v1\nkind: ConfigMap", "test-namespace");
291+
assertFalse(status.isReady());
292+
assertFalse(status.isFailed());
293+
assertTrue(status.getMessage().contains("null metadata"));
294+
}
295+
232296
@Test
233297
void testEstimateWhenPipelineHasSingleElementWithK8sObjectHavingNoRawJson() {
298+
setUpPipelineMocks();
234299
when(jobDynamicKubernetesApiResponse.getObject()).thenReturn(jobDynamicObject);
235300
when(jobDynamicObject.getRaw()).thenReturn(null);
236301
List<K8sPipelineElementStatus> statuses = estimator.estimateStatuses(pipeline);

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,24 @@ public void apply(String yaml, KubernetesObject owner) throws ApiException {
177177
}
178178

179179
public boolean isReady(String yaml) {
180-
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
180+
DynamicKubernetesObject obj;
181+
try {
182+
obj = Dynamics.newFromYaml(yaml);
183+
} catch (Exception e) {
184+
log.warn("Failed to parse YAML in isReady check: {}", e.getMessage());
185+
return false;
186+
}
187+
if (obj.getMetadata() == null) {
188+
log.warn("Failed to check isReady: parsed YAML has null metadata.");
189+
return false;
190+
}
181191
String namespace = obj.getMetadata().getNamespace();
182192
String name = obj.getMetadata().getName();
183193
String kind = obj.getKind();
194+
if (namespace == null) {
195+
log.warn("Failed to check isReady {}/{}: namespace is null.", kind, name);
196+
return false;
197+
}
184198
try {
185199
KubernetesApiResponse<DynamicKubernetesObject> existing = apiFor(obj).get(namespace, name);
186200
existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage()));
@@ -239,10 +253,24 @@ public static boolean isReady(DynamicKubernetesObject obj) {
239253
}
240254

241255
public boolean isFailed(String yaml) {
242-
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
256+
DynamicKubernetesObject obj;
257+
try {
258+
obj = Dynamics.newFromYaml(yaml);
259+
} catch (Exception e) {
260+
log.warn("Failed to parse YAML in isFailed check: {}", e.getMessage());
261+
return false;
262+
}
263+
if (obj.getMetadata() == null) {
264+
log.warn("Failed to check isFailed: parsed YAML has null metadata.");
265+
return false;
266+
}
243267
String namespace = obj.getMetadata().getNamespace();
244268
String name = obj.getMetadata().getName();
245269
String kind = obj.getKind();
270+
if (namespace == null) {
271+
log.warn("Failed to check isFailed {}/{}: namespace is null.", kind, name);
272+
return false;
273+
}
246274
try {
247275
KubernetesApiResponse<DynamicKubernetesObject> existing = apiFor(obj).get(namespace, name);
248276
existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage()));

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,15 @@ public Result reconcile(Request request) {
144144
status.setFailed(null);
145145
status.setMessage("Planned.");
146146
} catch (Exception e) {
147-
log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name,
148-
object.getSpec().getSql(), e);
147+
// SqlValidatorException and ValidationException indicate bad user SQL (client error).
148+
// Log at WARN level to avoid false server-error alerts.
149+
if (isSqlValidationError(e)) {
150+
log.warn("SQL validation error when planning a pipeline for {}/{} with SQL `{}`: {}", kind, name,
151+
object.getSpec().getSql(), e.getMessage());
152+
} else {
153+
log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name,
154+
object.getSpec().getSql(), e);
155+
}
149156

150157
// Mark the Subscription as failed.
151158
status.setFailed(true);
@@ -233,7 +240,17 @@ private static boolean diverged(V1alpha1SubscriptionSpec spec, V1alpha1Subscript
233240

234241
// Fetch attributes from downstream controllers
235242
private Map<String, String> fetchAttributes(String yaml) {
236-
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
243+
DynamicKubernetesObject obj;
244+
try {
245+
obj = Dynamics.newFromYaml(yaml);
246+
} catch (Exception e) {
247+
log.warn("Failed to parse YAML in fetchAttributes: {}", e.getMessage());
248+
return Collections.emptyMap();
249+
}
250+
if (obj.getMetadata() == null) {
251+
log.warn("Failed to fetch attributes: parsed YAML has null metadata.");
252+
return Collections.emptyMap();
253+
}
237254
String namespace = obj.getMetadata().getNamespace();
238255
String name = obj.getMetadata().getName();
239256
String kind = obj.getKind();
@@ -313,5 +330,19 @@ private static Map<String, String> map(Map<String, String> m) {
313330
return m;
314331
}
315332
}
333+
334+
// Checks whether an exception represents a SQL validation error (i.e., bad user SQL),
335+
// as opposed to a server-side infrastructure failure.
336+
static boolean isSqlValidationError(Throwable e) {
337+
Throwable x = e;
338+
while (x != null) {
339+
String className = x.getClass().getName();
340+
if (className.contains("SqlValidatorException") || className.contains("ValidationException")) {
341+
return true;
342+
}
343+
x = x.getCause();
344+
}
345+
return false;
346+
}
316347
}
317348

0 commit comments

Comments
 (0)