@@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
125125 Credentials credentials = config .getServiceAccount () == null ?
126126 null : GCPUtils .loadServiceAccountCredentials (config .getServiceAccount (),
127127 config .isServiceAccountFilePath ());
128- BigQuery bigQuery = GCPUtils .getBigQuery (config .getProject (), credentials );
128+ BigQuery bigQuery = GCPUtils .getBigQuery (config .getProject (), credentials , config . getReadTimeout () );
129129 //create dataset to store the results if not exists
130130 if (config .getStoreResults () && !Strings .isNullOrEmpty (datasetName ) &&
131131 !Strings .isNullOrEmpty (tableName )) {
@@ -283,11 +283,13 @@ public static final class Config extends AbstractBigQueryActionConfig {
283283 private static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration" ;
284284 private static final String NAME_RETRY_MULTIPLIER = "retryMultiplier" ;
285285 private static final String NAME_MAX_RETRY_COUNT = "maxRetryCount" ;
286+ private static final String NAME_READ_TIMEOUT = "readTimeout" ;
286287 public static final long DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 1L ;
287288 public static final double DEFAULT_RETRY_MULTIPLIER = 2.0 ;
288289 public static final int DEFAULT_MAX_RETRY_COUNT = 5 ;
289290 // Sn = a * (1 - r^n) / (r - 1)
290291 public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L ;
292+ public static final int DEFAULT_READ_TIMEOUT = 120 ;
291293
292294 @ Description ("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
293295 "If set to 'standard', the query will use BigQuery's standard SQL: " +
@@ -390,13 +392,19 @@ public static final class Config extends AbstractBigQueryActionConfig {
390392 "are reserved keys and cannot be used as label keys." )
391393 protected String jobLabelKeyValue ;
392394
395+ @ Name (NAME_READ_TIMEOUT )
396+ @ Nullable
397+ @ Macro
398+ @ Description ("Timeout in seconds to read data from an established HTTP connection (Default value is 120)." )
399+ private Integer readTimeout ;
400+
393401 private Config (@ Nullable String project , @ Nullable String serviceAccountType , @ Nullable String serviceFilePath ,
394402 @ Nullable String serviceAccountJson , @ Nullable String dataset , @ Nullable String table ,
395403 @ Nullable String location , @ Nullable String cmekKey , @ Nullable String dialect , @ Nullable String sql ,
396404 @ Nullable String mode , @ Nullable Boolean storeResults , @ Nullable String jobLabelKeyValue ,
397405 @ Nullable String rowAsArguments , @ Nullable Boolean retryOnBackendError ,
398406 @ Nullable Long initialRetryDuration , @ Nullable Long maxRetryDuration ,
399- @ Nullable Double retryMultiplier , @ Nullable Integer maxRetryCount ) {
407+ @ Nullable Double retryMultiplier , @ Nullable Integer maxRetryCount , @ Nullable Integer readTimeout ) {
400408 this .project = project ;
401409 this .serviceAccountType = serviceAccountType ;
402410 this .serviceFilePath = serviceFilePath ;
@@ -416,6 +424,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
416424 this .maxRetryDuration = maxRetryDuration ;
417425 this .maxRetryCount = maxRetryCount ;
418426 this .retryMultiplier = retryMultiplier ;
427+ this .readTimeout = readTimeout ;
419428 }
420429
421430 public boolean isLegacySQL () {
@@ -481,6 +490,10 @@ public int getMaxRetryCount() {
481490 return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount ;
482491 }
483492
493+ public int getReadTimeout () {
494+ return readTimeout == null ? DEFAULT_READ_TIMEOUT : readTimeout ;
495+ }
496+
484497 @ Override
485498 public void validate (FailureCollector failureCollector ) {
486499 validate (failureCollector , Collections .emptyMap ());
@@ -544,14 +557,15 @@ void validateJobLabelKeyValue(FailureCollector failureCollector) {
544557 !containsMacro (NAME_INITIAL_RETRY_DURATION ) && !containsMacro (NAME_MAX_RETRY_DURATION ) &&
545558 !containsMacro (NAME_MAX_RETRY_COUNT ) && !containsMacro (NAME_RETRY_MULTIPLIER )) {
546559 validateRetryConfiguration (
547- failureCollector , initialRetryDuration , maxRetryDuration , maxRetryCount , retryMultiplier
560+ failureCollector , initialRetryDuration , maxRetryDuration , maxRetryCount , retryMultiplier , readTimeout
548561 );
549562 }
550563 failureCollector .getOrThrowException ();
551564 }
552565
553566 void validateRetryConfiguration (FailureCollector failureCollector , Long initialRetryDuration ,
554- Long maxRetryDuration , Integer maxRetryCount , Double retryMultiplier ) {
567+ Long maxRetryDuration , Integer maxRetryCount , Double retryMultiplier ,
568+ Integer readTimeout ) {
555569 if (initialRetryDuration != null && initialRetryDuration <= 0 ) {
556570 failureCollector .addFailure ("Initial retry duration must be greater than 0." ,
557571 "Please specify a valid initial retry duration." )
@@ -577,6 +591,11 @@ void validateRetryConfiguration(FailureCollector failureCollector, Long initialR
577591 "Please specify a valid max retry duration." )
578592 .withConfigProperty (NAME_MAX_RETRY_DURATION );
579593 }
594+ if (readTimeout != null && readTimeout <= 0 ) {
595+ failureCollector .addFailure ("Read timeout must be greater than 0." ,
596+ "Please specify a valid read timeout" )
597+ .withConfigProperty (NAME_READ_TIMEOUT );
598+ }
580599 }
581600
582601 void validateCmekKey (FailureCollector failureCollector , Map <String , String > arguments ) {
@@ -612,7 +631,8 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
612631 } else {
613632 errorMessage = e .getMessage ();
614633 }
615- failureCollector .addFailure (String .format ("%s." , errorMessage ), "Please specify a valid query." )
634+ failureCollector .addFailure (String .format ("%s. Error code: %s." , errorMessage , e .getCode ()),
635+ "Please specify a valid query." )
616636 .withConfigProperty (SQL );
617637 }
618638 }
@@ -626,7 +646,7 @@ private BigQuery getBigQuery(FailureCollector failureCollector) {
626646 failureCollector .addFailure (e .getMessage (), null );
627647 failureCollector .getOrThrowException ();
628648 }
629- return GCPUtils .getBigQuery (getProject (), credentials );
649+ return GCPUtils .getBigQuery (getProject (), credentials , getReadTimeout () );
630650 }
631651
632652 public static Builder builder () {
@@ -656,6 +676,7 @@ public static class Builder {
656676 private Long maxRetryDuration ;
657677 private Integer maxRetryCount ;
658678 private Double retryMultiplier ;
679+ private Integer readTimeout ;
659680
660681 public Builder setProject (@ Nullable String project ) {
661682 this .project = project ;
@@ -752,6 +773,11 @@ public Builder setRetryMultiplier(@Nullable Double retryMultiplier) {
752773 return this ;
753774 }
754775
776+ public Builder setReadTimeout (@ Nullable Integer readTimeout ) {
777+ this .readTimeout = readTimeout ;
778+ return this ;
779+ }
780+
755781 public Config build () {
756782 return new Config (
757783 project ,
@@ -772,7 +798,8 @@ public Config build() {
772798 initialRetryDuration ,
773799 maxRetryDuration ,
774800 retryMultiplier ,
775- maxRetryCount
801+ maxRetryCount ,
802+ readTimeout
776803 );
777804 }
778805 }
0 commit comments