Skip to content

Commit 75c1ac0

Browse files
Merge pull request #16 from cloudsufi/cdap-1682
[PLUGIN-1682] [cherrypicked]Fixed null output schema in case of macros for connection properties
2 parents 75a10c9 + 9ef53ef commit 75c1ac0

2 files changed

Lines changed: 47 additions & 6 deletions

File tree

src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private SchemaHelper() {
5858

5959
public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollector collector) {
6060
if (!config.canConnect()) {
61-
return null;
61+
return getParsedSchema(config.getSchema());
6262
}
6363

6464
SnowflakeSourceAccessor snowflakeSourceAccessor = new SnowflakeSourceAccessor(config);
@@ -69,11 +69,7 @@ public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String
6969
FailureCollector collector, String importQuery) {
7070
try {
7171
if (!Strings.isNullOrEmpty(schema)) {
72-
try {
73-
return Schema.parseJson(schema);
74-
} catch (IOException | IllegalStateException e) {
75-
throw new SchemaParseException(e);
76-
}
72+
return getParsedSchema(schema);
7773
}
7874
return Strings.isNullOrEmpty(importQuery) ? null : getSchema(snowflakeAccessor, importQuery);
7975
} catch (SchemaParseException e) {
@@ -85,6 +81,17 @@ public static Schema getSchema(SnowflakeSourceAccessor snowflakeAccessor, String
8581
}
8682
}
8783

84+
private static Schema getParsedSchema(String schema) {
85+
if (Strings.isNullOrEmpty(schema)) {
86+
return null;
87+
}
88+
try {
89+
return Schema.parseJson(schema);
90+
} catch (IOException | IllegalStateException e) {
91+
throw new SchemaParseException(e);
92+
}
93+
}
94+
8895
public static Schema getSchema(SnowflakeAccessor snowflakeAccessor, String importQuery) {
8996
try {
9097
List<SnowflakeFieldDescriptor> result = snowflakeAccessor.describeQuery(importQuery);

src/test/java/io/cdap/plugin/snowflake/common/util/SchemaHelperTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.Assert;
2626
import org.junit.Test;
2727
import org.mockito.Mockito;
28+
2829
import java.io.IOException;
2930
import java.sql.Types;
3031
import java.util.ArrayList;
@@ -144,4 +145,37 @@ public void testGetSchemaFromSnowflake() throws IOException {
144145
Assert.assertTrue(collector.getValidationFailures().isEmpty());
145146
Assert.assertEquals(expected, actual);
146147
}
148+
149+
@Test
150+
public void testGetSchemaWhenMacroIsEnabled() {
151+
Schema expected = Schema.recordOf("test",
152+
Schema.Field.of("test_field", Schema.nullableOf(Schema.of(Schema.Type.LONG)))
153+
);
154+
155+
SnowflakeBatchSourceConfig mockConfig = Mockito.mock(SnowflakeBatchSourceConfig.class);
156+
Mockito.when(mockConfig.canConnect()).thenReturn(false);
157+
Mockito.when(mockConfig.getSchema()).thenReturn(expected.toString());
158+
159+
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
160+
Schema actual = SchemaHelper.getSchema(mockConfig, collector);
161+
162+
Assert.assertTrue(collector.getValidationFailures().isEmpty());
163+
Assert.assertEquals(expected, actual);
164+
165+
}
166+
167+
@Test
168+
public void testGetSchemaWhenMacroIsEnabledSchemaIsNull() {
169+
170+
SnowflakeBatchSourceConfig mockConfig = Mockito.mock(SnowflakeBatchSourceConfig.class);
171+
Mockito.when(mockConfig.canConnect()).thenReturn(false);
172+
Mockito.when(mockConfig.getSchema()).thenReturn(null);
173+
174+
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
175+
Schema actual = SchemaHelper.getSchema(mockConfig, collector);
176+
177+
Assert.assertTrue(collector.getValidationFailures().isEmpty());
178+
Assert.assertNull(actual);
179+
180+
}
147181
}

0 commit comments

Comments
 (0)