Skip to content

Commit 4bc782f

Browse files
jesrypandawajesry.pandawa
andauthored
fix: fix nullpointer on empty bq default columns (#22)
* fix: fix nullpointer on empty bq default columns * chore: remove unused tablePartitionKey Co-authored-by: jesry.pandawa <jesry.pandawa@go-jek.com>
1 parent e29ee24 commit 4bc782f

2 files changed

Lines changed: 9 additions & 25 deletions

File tree

src/main/java/io/odpf/depot/bigquery/handler/JsonErrorHandler.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.List;
1818
import java.util.Map;
1919
import java.util.Map.Entry;
20-
import java.util.Optional;
2120
import java.util.Set;
2221
import java.util.stream.Collectors;
2322

@@ -28,8 +27,6 @@ the job of the class is to handle unknown field errors and then update the bq ta
2827
public class JsonErrorHandler implements ErrorHandler {
2928

3029
private final BigQueryClient bigQueryClient;
31-
private final String tablePartitionKey;
32-
private final Optional<LegacySQLTypeName> partitionKeyDataType;
3330
private final boolean castAllColumnsToStringDataType;
3431
private final Map<String, String> metadataColumnsTypesMap;
3532
private final String bqMetadataNamespace;
@@ -40,15 +37,9 @@ public JsonErrorHandler(BigQueryClient bigQueryClient, BigQuerySinkConfig bigQue
4037

4138
this.instrumentation = instrumentation;
4239
this.bigQueryClient = bigQueryClient;
43-
tablePartitionKey = bigQuerySinkConfig.isTablePartitioningEnabled() ? bigQuerySinkConfig.getTablePartitionKey() : "";
4440
defaultColumnsMap = bigQuerySinkConfig.getSinkBigqueryDefaultColumns()
4541
.stream()
4642
.collect(Collectors.toMap(TupleString::getFirst, TupleString::getSecond));
47-
if (bigQuerySinkConfig.isTablePartitioningEnabled()) {
48-
partitionKeyDataType = Optional.of(LegacySQLTypeName.valueOfStrict(defaultColumnsMap.get(tablePartitionKey).toUpperCase()));
49-
} else {
50-
partitionKeyDataType = Optional.empty();
51-
}
5243
castAllColumnsToStringDataType = bigQuerySinkConfig.getSinkBigqueryDefaultDatatypeStringEnable();
5344
bqMetadataNamespace = bigQuerySinkConfig.getBqMetadataNamespace();
5445
if (!bigQuerySinkConfig.shouldAddMetadata()) {
@@ -103,11 +94,11 @@ private List<BigQueryError> getBqErrorsWithNoSuchFields(List<BigQueryError> valu
10394
).collect(Collectors.toList());
10495
}
10596

97+
/**
98+
* This method only used for unknown fields.
99+
*/
106100

107101
private Field getField(String key) {
108-
if (!tablePartitionKey.isEmpty() && tablePartitionKey.equals(key) && partitionKeyDataType.isPresent()) {
109-
return Field.of(key, partitionKeyDataType.get());
110-
}
111102
if (!bqMetadataNamespace.isEmpty()) {
112103
throw new UnsupportedOperationException("metadata namespace is not supported, because nested json structure is not supported");
113104
}

src/main/java/io/odpf/depot/bigquery/json/BigqueryJsonUpdateListener.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,22 @@ private void addMetadataFields(HashSet<Field> fieldsToBeUpdated, List<TupleStrin
9393
private Field getField(TupleString tupleString) {
9494
String fieldName = tupleString.getFirst();
9595
LegacySQLTypeName fieldDataType = LegacySQLTypeName.valueOfStrict(tupleString.getSecond().toUpperCase());
96-
97-
if (isValidPartitionField(fieldName, fieldDataType)) {
98-
return Field.of(fieldName, fieldDataType);
99-
}
100-
101-
return Field.of(fieldName, fieldDataType);
96+
return checkAndCreateField(fieldName, fieldDataType);
10297
}
10398

10499
/**
105-
* Range Bigquery partitioning is not supported, supported paritition fields have to be of DATE or TIMESTAMP type..
100+
* Range BigQuery partitioning is not supported, supported partition fields have to be of DATE or TIMESTAMP type..
106101
*/
107-
private boolean isValidPartitionField(String fieldName, LegacySQLTypeName fieldDataType) {
102+
private Field checkAndCreateField(String fieldName, LegacySQLTypeName fieldDataType) {
108103
Boolean isPartitioningEnabled = config.isTablePartitioningEnabled();
109104
if (!isPartitioningEnabled) {
110-
return false;
105+
return Field.of(fieldName, fieldDataType);
111106
}
112107
String partitionKey = config.getTablePartitionKey();
113-
114108
boolean isValidPartitionDataType = (fieldDataType == LegacySQLTypeName.TIMESTAMP || fieldDataType == LegacySQLTypeName.DATE);
115109
if (partitionKey.equals(fieldName) && !isValidPartitionDataType) {
116-
throw new UnsupportedOperationException(" supported paritition fields have to be of DATE or TIMESTAMP type..");
110+
throw new UnsupportedOperationException("supported partition fields have to be of DATE or TIMESTAMP type..");
117111
}
118-
119-
return true;
112+
return Field.of(fieldName, fieldDataType);
120113
}
121114
}

0 commit comments

Comments
 (0)