Skip to content

Commit a9eb58d

Browse files
Merge pull request #1402 from cloudsufi/cherry-pick-8cfba2024019705cebc8864492b73d6883f11134
[🍒][PLUGIN-1563] Add support in BQ Source to read JSON columns
2 parents d69a13f + 0a4d769 commit a9eb58d

4 files changed

Lines changed: 24 additions & 2 deletions

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryDataParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ private static void addToRecordBuilder(StructuredRecord.Builder recordBuilder, S
159159
*/
160160
public static Object convertValue(Field field, FieldValue fieldValue) {
161161
LegacySQLTypeName type = field.getType();
162-
StandardSQLTypeName standardType = type.getStandardType();
162+
// Treat JSON as string
163+
StandardSQLTypeName standardType = LegacySQLTypeName.valueOf("JSON").equals(type) ?
164+
StandardSQLTypeName.STRING : type.getStandardType();
163165
switch (standardType) {
164166
case TIME:
165167
return LocalTime.parse(fieldValue.getStringValue());

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,9 @@ public static Schema convertFieldType(Field field, @Nullable FailureCollector co
292292
public static Schema convertFieldType(Field field, @Nullable FailureCollector collector,
293293
@Nullable String recordPrefix) {
294294
LegacySQLTypeName type = field.getType();
295-
StandardSQLTypeName standardType = type.getStandardType();
295+
// Treat JSON as string
296+
StandardSQLTypeName standardType = LegacySQLTypeName.valueOf("JSON").equals(type) ?
297+
StandardSQLTypeName.STRING : type.getStandardType();
296298
switch (standardType) {
297299
case FLOAT64:
298300
// float is a float64, so corresponding type becomes double

src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryDataParserTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.bigquery.FieldList;
2121
import com.google.cloud.bigquery.FieldValue;
2222
import com.google.cloud.bigquery.FieldValueList;
23+
import com.google.cloud.bigquery.LegacySQLTypeName;
2324
import com.google.cloud.bigquery.StandardSQLTypeName;
2425
import com.google.cloud.bigquery.TableResult;
2526
import com.google.common.io.BaseEncoding;
@@ -159,4 +160,13 @@ private String paddHeaddingZero(String value, int length) {
159160
}
160161
return Strings.repeat('0', length - value.length()) + value;
161162
}
163+
164+
@Test
165+
public void testJsonFieldConversionToString() {
166+
Field field = Field.newBuilder("demo", LegacySQLTypeName.valueOf("JSON")).build();
167+
String jsonValue = "{\"key\":\"value\"}";
168+
FieldValue fieldValue = FieldValue.of(FieldValue.Attribute.PRIMITIVE, jsonValue);
169+
Object result = BigQueryDataParser.convertValue(field, fieldValue);
170+
Assert.assertEquals(jsonValue, result);
171+
}
162172
}

src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.gcp.bigquery.util;
1818

1919
import com.google.cloud.bigquery.Field;
20+
import com.google.cloud.bigquery.LegacySQLTypeName;
2021
import com.google.cloud.bigquery.StandardSQLTypeName;
2122
import io.cdap.cdap.api.data.schema.Schema;
2223
import io.cdap.cdap.etl.api.FailureCollector;
@@ -470,4 +471,11 @@ public void testJobLabelWithValueStartingWithCaptialLetter() {
470471
collector.getValidationFailures().get(0).getMessage());
471472
}
472473

474+
@Test
475+
public void testConvertFieldTypeJsonToString() {
476+
Field field = Field.newBuilder("demo", LegacySQLTypeName.valueOf("JSON")).build();
477+
Schema expectedSchema = Schema.of(Schema.Type.STRING);
478+
Schema result = BigQueryUtil.convertFieldType(field, null, null);
479+
Assert.assertEquals(expectedSchema, result);
480+
}
473481
}

0 commit comments

Comments
 (0)