5858
5959import java .io .IOException ;
6060import java .time .Duration ;
61+ import java .util .Arrays ;
6162import java .util .Collections ;
6263import java .util .Map ;
6364import java .util .Set ;
65+ import java .util .stream .Collectors ;
6466import javax .annotation .Nullable ;
6567
6668/**
@@ -111,6 +113,7 @@ public void run(ActionContext context) throws Exception {
111113 String datasetProjectId = config .getDatasetProject ();
112114 if (config .getStoreResults () && datasetProjectId != null && datasetName != null && tableName != null ) {
113115 builder .setDestinationTable (TableId .of (datasetProjectId , datasetName , tableName ));
116+ builder .setWriteDisposition (JobInfo .WriteDisposition .valueOf (config .getWritePreference ()));
114117 }
115118
116119 // Enable or Disable the query cache to force live query evaluation.
@@ -274,6 +277,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
274277 private static final String SQL = "sql" ;
275278 private static final String DATASET = "dataset" ;
276279 private static final String TABLE = "table" ;
280+ private static final String WRITE_PREFERENCE = "writePreference" ;
277281 private static final String NAME_LOCATION = "location" ;
278282 public static final String NAME_BQ_JOB_LABELS = "jobLabels" ;
279283 private static final int ERROR_CODE_NOT_FOUND = 404 ;
@@ -290,6 +294,8 @@ public static final class Config extends AbstractBigQueryActionConfig {
290294 // Sn = a * (1 - r^n) / (r - 1)
291295 public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L ;
292296 public static final int DEFAULT_READ_TIMEOUT = 120 ;
297+ public static final Set <String > VALID_WRITE_PREFERENCES = Arrays .stream (JobInfo .WriteDisposition .values ())
298+ .map (Enum ::name ).collect (Collectors .toSet ());
293299
294300 @ Description ("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
295301 "If set to 'standard', the query will use BigQuery's standard SQL: " +
@@ -398,13 +404,20 @@ public static final class Config extends AbstractBigQueryActionConfig {
398404 @ Description ("Timeout in seconds to read data from an established HTTP connection (Default value is 120)." )
399405 private Integer readTimeout ;
400406
407+ @ Name (WRITE_PREFERENCE )
408+ @ Nullable
409+ @ Macro
410+ @ Description ("Specifies if a job should overwrite or append the existing destination table if it already exists." )
411+ private String writePreference ;
412+
401413 private Config (@ Nullable String project , @ Nullable String serviceAccountType , @ Nullable String serviceFilePath ,
402414 @ Nullable String serviceAccountJson , @ Nullable String dataset , @ Nullable String table ,
403415 @ Nullable String location , @ Nullable String cmekKey , @ Nullable String dialect , @ Nullable String sql ,
404416 @ Nullable String mode , @ Nullable Boolean storeResults , @ Nullable String jobLabelKeyValue ,
405417 @ Nullable String rowAsArguments , @ Nullable Boolean retryOnBackendError ,
406418 @ Nullable Long initialRetryDuration , @ Nullable Long maxRetryDuration ,
407- @ Nullable Double retryMultiplier , @ Nullable Integer maxRetryCount , @ Nullable Integer readTimeout ) {
419+ @ Nullable Double retryMultiplier , @ Nullable Integer maxRetryCount , @ Nullable Integer readTimeout ,
420+ @ Nullable String writePreference ) {
408421 this .project = project ;
409422 this .serviceAccountType = serviceAccountType ;
410423 this .serviceFilePath = serviceFilePath ;
@@ -425,6 +438,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
425438 this .maxRetryCount = maxRetryCount ;
426439 this .retryMultiplier = retryMultiplier ;
427440 this .readTimeout = readTimeout ;
441+ this .writePreference = writePreference ;
428442 }
429443
430444 public boolean isLegacySQL () {
@@ -451,6 +465,11 @@ public Boolean getStoreResults() {
451465 return storeResults == null || storeResults ;
452466 }
453467
468+ public String getWritePreference () {
469+ String defaultPreference = JobInfo .WriteDisposition .WRITE_EMPTY .name ();
470+ return Strings .isNullOrEmpty (writePreference ) ? defaultPreference : writePreference .toUpperCase ();
471+ }
472+
454473 public QueryJobConfiguration .Priority getMode () {
455474 return QueryJobConfiguration .Priority .valueOf (mode .toUpperCase ());
456475 }
@@ -546,9 +565,25 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
546565 validateJobLabelKeyValue (failureCollector );
547566 }
548567
568+ if (!containsMacro (WRITE_PREFERENCE )) {
569+ validateWritePreference (failureCollector , getWritePreference ());
570+ }
571+
549572 failureCollector .getOrThrowException ();
550573 }
551574
575+ void validateWritePreference (FailureCollector failureCollector , String writePreference ) {
576+ if (!VALID_WRITE_PREFERENCES .contains (writePreference )) {
577+ failureCollector .addFailure (
578+ String .format ("Invalid write preference '%s'. Allowed values are '%s'." ,
579+ writePreference , VALID_WRITE_PREFERENCES .toString ()
580+ ),
581+ "Please provide a valid write preference."
582+ )
583+ .withConfigProperty (WRITE_PREFERENCE );
584+ }
585+ }
586+
552587 void validateJobLabelKeyValue (FailureCollector failureCollector ) {
553588 BigQueryUtil .validateJobLabelKeyValue (jobLabelKeyValue , failureCollector , NAME_BQ_JOB_LABELS );
554589 // Verify retry configuration when retry on backend error is enabled and none of the retry configuration
@@ -677,6 +712,7 @@ public static class Builder {
677712 private Integer maxRetryCount ;
678713 private Double retryMultiplier ;
679714 private Integer readTimeout ;
715+ private String writePreference ;
680716
681717 public Builder setProject (@ Nullable String project ) {
682718 this .project = project ;
@@ -778,6 +814,11 @@ public Builder setReadTimeout(@Nullable Integer readTimeout) {
778814 return this ;
779815 }
780816
817+ public Builder setWritePreference (@ Nullable String writePreference ) {
818+ this .writePreference = writePreference ;
819+ return this ;
820+ }
821+
781822 public Config build () {
782823 return new Config (
783824 project ,
@@ -799,7 +840,8 @@ public Config build() {
799840 maxRetryDuration ,
800841 retryMultiplier ,
801842 maxRetryCount ,
802- readTimeout
843+ readTimeout ,
844+ writePreference
803845 );
804846 }
805847 }
0 commit comments