Skip to content

Commit 6ce1e5d

Browse files
authored
Merge pull request #1430 from data-integrations/bugfix/PLUGIN-1800-bq-execute-timeout-increase
PLUGIN-1800: Adding read timeout to BQ Execute Config with default 120s
2 parents 169a3f4 + 4cb7244 commit 6ce1e5d

24 files changed

Lines changed: 110 additions & 43 deletions

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void run(ActionContext context) throws Exception {
8080
Credentials credentials = config.getServiceAccount() == null ?
8181
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
8282
config.isServiceAccountFilePath());
83-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
83+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
8484
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
8585

8686
LOG.info("Executing SQL as job {}.", jobId.getJob());

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
125125
Credentials credentials = config.getServiceAccount() == null ?
126126
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
127127
config.isServiceAccountFilePath());
128-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
128+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout());
129129
//create dataset to store the results if not exists
130130
if (config.getStoreResults() && !Strings.isNullOrEmpty(datasetName) &&
131131
!Strings.isNullOrEmpty(tableName)) {
@@ -283,11 +283,13 @@ public static final class Config extends AbstractBigQueryActionConfig {
283283
private static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration";
284284
private static final String NAME_RETRY_MULTIPLIER = "retryMultiplier";
285285
private static final String NAME_MAX_RETRY_COUNT = "maxRetryCount";
286+
private static final String NAME_READ_TIMEOUT = "readTimeout";
286287
public static final long DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 1L;
287288
public static final double DEFAULT_RETRY_MULTIPLIER = 2.0;
288289
public static final int DEFAULT_MAX_RETRY_COUNT = 5;
289290
// Sn = a * (1 - r^n) / (r - 1)
290291
public static final long DEFULT_MAX_RETRY_DURATION_SECONDS = 63L;
292+
public static final int DEFAULT_READ_TIMEOUT = 120;
291293

292294
@Description("Dialect of the SQL command. The value must be 'legacy' or 'standard'. " +
293295
"If set to 'standard', the query will use BigQuery's standard SQL: " +
@@ -390,13 +392,19 @@ public static final class Config extends AbstractBigQueryActionConfig {
390392
"are reserved keys and cannot be used as label keys.")
391393
protected String jobLabelKeyValue;
392394

395+
@Name(NAME_READ_TIMEOUT)
396+
@Nullable
397+
@Macro
398+
@Description("Timeout in seconds to read data from an established HTTP connection (Default value is 120).")
399+
private Integer readTimeout;
400+
393401
private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
394402
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
395403
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
396404
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue,
397405
@Nullable String rowAsArguments, @Nullable Boolean retryOnBackendError,
398406
@Nullable Long initialRetryDuration, @Nullable Long maxRetryDuration,
399-
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount) {
407+
@Nullable Double retryMultiplier, @Nullable Integer maxRetryCount, @Nullable Integer readTimeout) {
400408
this.project = project;
401409
this.serviceAccountType = serviceAccountType;
402410
this.serviceFilePath = serviceFilePath;
@@ -416,6 +424,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
416424
this.maxRetryDuration = maxRetryDuration;
417425
this.maxRetryCount = maxRetryCount;
418426
this.retryMultiplier = retryMultiplier;
427+
this.readTimeout = readTimeout;
419428
}
420429

421430
public boolean isLegacySQL() {
@@ -481,6 +490,10 @@ public int getMaxRetryCount() {
481490
return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
482491
}
483492

493+
public int getReadTimeout() {
494+
return readTimeout == null ? DEFAULT_READ_TIMEOUT : readTimeout;
495+
}
496+
484497
@Override
485498
public void validate(FailureCollector failureCollector) {
486499
validate(failureCollector, Collections.emptyMap());
@@ -544,14 +557,15 @@ void validateJobLabelKeyValue(FailureCollector failureCollector) {
544557
!containsMacro(NAME_INITIAL_RETRY_DURATION) && !containsMacro(NAME_MAX_RETRY_DURATION) &&
545558
!containsMacro(NAME_MAX_RETRY_COUNT) && !containsMacro(NAME_RETRY_MULTIPLIER)) {
546559
validateRetryConfiguration(
547-
failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier
560+
failureCollector, initialRetryDuration, maxRetryDuration, maxRetryCount, retryMultiplier, readTimeout
548561
);
549562
}
550563
failureCollector.getOrThrowException();
551564
}
552565

553566
void validateRetryConfiguration(FailureCollector failureCollector, Long initialRetryDuration,
554-
Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier) {
567+
Long maxRetryDuration, Integer maxRetryCount, Double retryMultiplier,
568+
Integer readTimeout) {
555569
if (initialRetryDuration != null && initialRetryDuration <= 0) {
556570
failureCollector.addFailure("Initial retry duration must be greater than 0.",
557571
"Please specify a valid initial retry duration.")
@@ -577,6 +591,11 @@ void validateRetryConfiguration(FailureCollector failureCollector, Long initialR
577591
"Please specify a valid max retry duration.")
578592
.withConfigProperty(NAME_MAX_RETRY_DURATION);
579593
}
594+
if (readTimeout != null && readTimeout <= 0) {
595+
failureCollector.addFailure("Read timeout must be greater than 0.",
596+
"Please specify a valid read timeout")
597+
.withConfigProperty(NAME_READ_TIMEOUT);
598+
}
580599
}
581600

582601
void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
@@ -612,7 +631,8 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
612631
} else {
613632
errorMessage = e.getMessage();
614633
}
615-
failureCollector.addFailure(String.format("%s.", errorMessage), "Please specify a valid query.")
634+
failureCollector.addFailure(String.format("%s. Error code: %s.", errorMessage, e.getCode()),
635+
"Please specify a valid query.")
616636
.withConfigProperty(SQL);
617637
}
618638
}
@@ -626,7 +646,7 @@ private BigQuery getBigQuery(FailureCollector failureCollector) {
626646
failureCollector.addFailure(e.getMessage(), null);
627647
failureCollector.getOrThrowException();
628648
}
629-
return GCPUtils.getBigQuery(getProject(), credentials);
649+
return GCPUtils.getBigQuery(getProject(), credentials, getReadTimeout());
630650
}
631651

632652
public static Builder builder() {
@@ -656,6 +676,7 @@ public static class Builder {
656676
private Long maxRetryDuration;
657677
private Integer maxRetryCount;
658678
private Double retryMultiplier;
679+
private Integer readTimeout;
659680

660681
public Builder setProject(@Nullable String project) {
661682
this.project = project;
@@ -752,6 +773,11 @@ public Builder setRetryMultiplier(@Nullable Double retryMultiplier) {
752773
return this;
753774
}
754775

776+
public Builder setReadTimeout(@Nullable Integer readTimeout) {
777+
this.readTimeout = readTimeout;
778+
return this;
779+
}
780+
755781
public Config build() {
756782
return new Config(
757783
project,
@@ -772,7 +798,8 @@ public Config build() {
772798
initialRetryDuration,
773799
maxRetryDuration,
774800
retryMultiplier,
775-
maxRetryCount
801+
maxRetryCount,
802+
readTimeout
776803
);
777804
}
778805
}

src/main/java/io/cdap/plugin/gcp/bigquery/common/BigQueryBaseConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void validateCmekKeyLocation(@Nullable CryptoKeyName cmekKeyName, @Nullab
213213
DatasetId datasetId = DatasetId.of(datasetProjectId, datasetName);
214214
TableId tableId = tableName == null ? null : TableId.of(datasetProjectId, datasetName, tableName);
215215
Credentials credentials = connection.getCredentials(failureCollector);
216-
BigQuery bigQuery = GCPUtils.getBigQuery(connection.getProject(), credentials);
216+
BigQuery bigQuery = GCPUtils.getBigQuery(connection.getProject(), credentials, null);
217217
Storage storage = GCPUtils.getStorage(connection.getProject(), credentials);
218218
if (bigQuery == null || storage == null) {
219219
return;

src/main/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void test(ConnectorContext context) throws ValidationException {
132132
}
133133

134134
try {
135-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials);
135+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials, null);
136136
bigQuery.listDatasets(BigQuery.DatasetListOption.pageSize(1));
137137
} catch (Exception e) {
138138
failureCollector.addFailure(String.format("Could not connect to BigQuery: %s", e.getMessage()),
@@ -236,7 +236,7 @@ private BigQuery getBigQuery(String project) throws IOException {
236236
GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
237237
}
238238
// Here project decides where the BQ job is run and under which the datasets is listed
239-
return GCPUtils.getBigQuery(project, credentials);
239+
return GCPUtils.getBigQuery(project, credentials, null);
240240
}
241241

242242
/**

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
8888
Credentials credentials = serviceAccount == null ?
8989
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
9090
String project = config.getProject();
91-
bigQuery = GCPUtils.getBigQuery(project, credentials);
91+
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
9292
FailureCollector collector = context.getFailureCollector();
9393
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
9494
collector.getOrThrowException();

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,6 @@ private static BigQuery getBigQuery(Configuration config) throws IOException {
810810
String projectId = ConfigurationUtil.getMandatoryConfig(config,
811811
BigQueryConfiguration.PROJECT_ID);
812812
Credentials credentials = GCPUtils.loadCredentialsFromConf(config);
813-
return GCPUtils.getBigQuery(projectId, credentials);
813+
return GCPUtils.getBigQuery(projectId, credentials, null);
814814
}
815815
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
136136
// Create BigQuery client
137137
String serviceAccount = config.getServiceAccount();
138138
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
139-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
139+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
140140
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
141141
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
142142

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ void validateCmekKey(FailureCollector collector, Map<String, String> arguments)
186186
}
187187
DatasetId datasetId = DatasetId.of(getDatasetProject(), getDataset());
188188
Credentials credentials = connection.getCredentials(collector);
189-
BigQuery bigQuery = GCPUtils.getBigQuery(getProject(), credentials);
189+
BigQuery bigQuery = GCPUtils.getBigQuery(getProject(), credentials, null);
190190
if (bigQuery == null) {
191191
return;
192192
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
171171
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
172172
try {
173173
Credentials credentials = getCredentials(config.getConnection());
174-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
174+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
175175
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
176176
LOG.debug("Deleted temporary table '{}'", temporaryTable);
177177
} catch (IOException e) {

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void prepareRun(SQLEngineContext context) throws Exception {
156156
datasetName = sqlEngineConfig.getDataset();
157157

158158
// Initialize BQ and GCS clients.
159-
bigQuery = GCPUtils.getBigQuery(project, credentials);
159+
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
160160
storage = GCPUtils.getStorage(project, credentials);
161161
Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetProject, datasetName));
162162
bucket = BigQueryUtil.getStagingBucketName(context.getRuntimeArguments(), sqlEngineConfig.getLocation(),

0 commit comments

Comments
 (0)