Skip to content

Commit a467a7a

Browse files
committed
Added support for destination table write preference in BQ Execute Plugin
Addressed PR comments: updated description, added documentation for the added feature Formatted to fix failing tests Refactor: Replace hardcoded write preference strings with JobInfo.WriteDisposition constants Refactor: Simplify error message for invalid write preference validation Refactor: Centralize valid write preferences in BigQueryExecute.Config Refactor: Dynamically generate valid write preferences from JobInfo.WriteDisposition
1 parent 601f62b commit a467a7a

4 files changed

Lines changed: 97 additions & 2 deletions

File tree

docs/BigQueryExecute-action.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ cache that will be flushed whenever tables in the query are modified.
6262
It is only applicable when users choose to store the query results in a BigQuery table.
6363
More information can be found [here](https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys)
6464

65+
**Destination Table Write Preference**: Provides the following options as write preferences for the destination table:
66+
* **Write if Empty**: Only write if the table is empty
67+
* **Append to Table**: Add results to existing data. Schema should match.
68+
* **Overwrite Table**: Replace all existing data. Schema will also be overriden.
69+
6570
**Row As Arguments**: Row as arguments. For example, if the query is 'select min(id) as min_id, max(id) as max_id from my_dataset.my_table',
6671
an arguments for 'min_id' and 'max_id' will be set based on the query results. Plugins further down the pipeline can then
6772
reference these values with macros ${min_id} and ${max_id}.

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@
5858

5959
import java.io.IOException;
6060
import java.time.Duration;
61+
import java.util.Arrays;
6162
import java.util.Collections;
6263
import java.util.Map;
6364
import java.util.Set;
65+
import java.util.stream.Collectors;
6466
import javax.annotation.Nullable;
6567

6668
/**
@@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception {
111113
String datasetProjectId = config.getDatasetProject();
112114
if (config.getStoreResults() && datasetProjectId != null && datasetName != null && tableName != null) {
113115
builder.setDestinationTable(TableId.of(datasetProjectId, datasetName, tableName));
116+
builder.setWriteDisposition(JobInfo.WriteDisposition.valueOf(config.getWritePreference()));
114117
}
115118

116119
// Enable or Disable the query cache to force live query evaluation.
@@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
274277
private static final String SQL = "sql";
275278
private static final String DATASET = "dataset";
276279
private static final String TABLE = "table";
280+
private static final String WRITE_PREFERENCE = "writePreference";
277281
private static final String NAME_LOCATION = "location";
278282
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
279283
private static final int ERROR_CODE_NOT_FOUND = 404;
@@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig {
290294
// Sn = a * (1 - r^n) / (r - 1)
291295
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
292296
public static final int DEFAULT_READ_TIMEOUT = 120;
297+
public static final Set<String> VALID_WRITE_PREFERENCES = Arrays.stream(JobInfo.WriteDisposition.values())
298+
.map(Enum::name).collect(Collectors.toSet());
293299

294300
@Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
295301
"If set to 'standard', the query will use BigQuery's standard SQL: " +
@@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig {
398404
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
399405
private Integer readTimeout;
400406

407+
@Name(WRITE_PREFERENCE)
408+
@Nullable
409+
@Macro
410+
@Description("Specifies if a job should overwrite or append the existing destination table if it already exists.")
411+
private String writePreference;
412+
401413
private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
402414
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
403415
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
404416
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue,
405417
@Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError,
406418
@Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration,
407-
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) {
419+
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout,
420+
@Nullable String writePreference) {
408421
this.project = project;
409422
this.serviceAccountType = serviceAccountType;
410423
this.serviceFilePath = serviceFilePath;
@@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
425438
this.maxRetryCount = maxRetryCount;
426439
this.retryMultiplier = retryMultiplier;
427440
this.readTimeout = readTimeout;
441+
this.writePreference = writePreference;
428442
}
429443

430444
public boolean isLegacySQL() {
@@ -451,6 +465,11 @@ public Boolean getStoreResults() {
451465
return storeResults == null || storeResults;
452466
}
453467

468+
public String getWritePreference() {
469+
String defaultPreference = JobInfo.WriteDisposition.WRITE_EMPTY.name();
470+
return Strings.isNullOrEmpty(writePreference) ? defaultPreference : writePreference.toUpperCase();
471+
}
472+
454473
public QueryJobConfiguration.Priority getMode() {
455474
return QueryJobConfiguration.Priority.valueOf(mode.toUpperCase());
456475
}
@@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
546565
validateJobLabelKeyValue(failureCollector);
547566
}
548567

568+
if (!containsMacro(WRITE_PREFERENCE)) {
569+
validateWritePreference(failureCollector, getWritePreference());
570+
}
571+
549572
failureCollector.getOrThrowException();
550573
}
551574

575+
void validateWritePreference(FailureCollector failureCollector, String writePreference) {
576+
if (!VALID_WRITE_PREFERENCES.contains(writePreference)) {
577+
failureCollector.addFailure(
578+
String.format("Invalid write preference '%s'. Allowed values are '%s'.",
579+
writePreference, VALID_WRITE_PREFERENCES.toString()
580+
),
581+
"Please provide a valid write preference."
582+
)
583+
.withConfigProperty(WRITE_PREFERENCE);
584+
}
585+
}
586+
552587
void validateJobLabelKeyValue(FailureCollector failureCollector) {
553588
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
554589
// Verify retry configuration when retry on backend error is enabled and none of the retry configuration
@@ -677,6 +712,7 @@ public static class Builder {
677712
private Integer maxRetryCount;
678713
private Double retryMultiplier;
679714
private Integer readTimeout;
715+
private String writePreference;
680716

681717
public Builder setProject(@Nullable String project) {
682718
this.project = project;
@@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) {
778814
return this;
779815
}
780816

817+
public Builder setWritePreference(@Nullable String writePreference) {
818+
this.writePreference = writePreference;
819+
return this;
820+
}
821+
781822
public Config build() {
782823
return new Config(
783824
project,
@@ -799,7 +840,8 @@ public Config build() {
799840
maxRetryDuration,
800841
retryMultiplier,
801842
maxRetryCount,
802-
readTimeout
843+
readTimeout,
844+
writePreference
803845
);
804846
}
805847
}

src/test/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecuteTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.bigquery.JobStatus;
2626
import com.google.cloud.bigquery.QueryJobConfiguration;
2727
import com.google.cloud.bigquery.TableResult;
28+
import com.google.common.collect.ImmutableSet;
2829
import io.cdap.cdap.api.metrics.Metrics;
2930
import io.cdap.cdap.etl.api.StageMetrics;
3031
import io.cdap.cdap.etl.api.action.ActionContext;
@@ -37,6 +38,7 @@
3738
import org.mockito.Mock;
3839
import org.mockito.Mockito;
3940
import org.mockito.MockitoAnnotations;
41+
import java.util.Set;
4042

4143
public class BigQueryExecuteTest {
4244
@Mock
@@ -226,5 +228,24 @@ public void testValidateRetryConfigurationWithInvalidReadTimeout() {
226228
failureCollector.getValidationFailures().get(0).getMessage());
227229
}
228230

231+
@Test
232+
public void testValidateWritePreferenceWithInvalidValue() {
233+
config.validateWritePreference(failureCollector, "INVALID_PREFERENCE");
234+
235+
// Assert validation failure
236+
Assert.assertEquals(1, failureCollector.getValidationFailures().size());
237+
Assert.assertEquals(
238+
String.format("Invalid write preference 'INVALID_PREFERENCE'. Allowed values are '%s'.",
239+
config.VALID_WRITE_PREFERENCES.toString()
240+
),
241+
failureCollector.getValidationFailures().get(0).getMessage()
242+
);
243+
}
244+
245+
@Test
246+
public void testDefaultWritePreference() {
247+
Assert.assertEquals(JobInfo.WriteDisposition.WRITE_EMPTY.name(), config.getWritePreference());
248+
}
249+
229250
}
230251

widgets/BigQueryExecute-action.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,29 @@
150150
"placeholder": "projects/<gcp-project-id>/locations/<key-location>/keyRings/<key-ring-name>/cryptoKeys/<key-name>"
151151
}
152152
},
153+
{
154+
"widget-type": "radio-group",
155+
"label": "Destination Table Write Preference",
156+
"name": "writePreference",
157+
"widget-attributes": {
158+
"layout": "vertical",
159+
"default": "WRITE_EMPTY",
160+
"options": [
161+
{
162+
"id": "WRITE_EMPTY",
163+
"label": "Write if Empty"
164+
},
165+
{
166+
"id": "WRITE_APPEND",
167+
"label": "Append to Table"
168+
},
169+
{
170+
"id": "WRITE_TRUNCATE",
171+
"label": "Overwrite Table"
172+
}
173+
]
174+
}
175+
},
153176
{
154177
"name": "rowAsArguments",
155178
"label": "Row As Arguments",
@@ -313,6 +336,10 @@
313336
{
314337
"type": "property",
315338
"name": "cmekKey"
339+
},
340+
{
341+
"type": "property",
342+
"name": "writePreference"
316343
}
317344
]
318345
}

0 commit comments

Comments
 (0)