Skip to content

Commit c6bf17d

Browse files
author
Pushpender Saini
authored
Merge pull request #1496 from cloudsufi/BigQueryActionPluginErrorMang
[PLUGIN-1849] Error Management for BigQuery Action plugin
2 parents afa2d5c + 5821daa commit c6bf17d

7 files changed

Lines changed: 258 additions & 45 deletions

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetter.java

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.auth.Credentials;
2020
import com.google.cloud.StringEnumValue;
2121
import com.google.cloud.bigquery.BigQuery;
22+
import com.google.cloud.bigquery.BigQueryException;
2223
import com.google.cloud.bigquery.Field;
2324
import com.google.cloud.bigquery.FieldValue;
2425
import com.google.cloud.bigquery.FieldValueList;
@@ -33,8 +34,12 @@
3334
import io.cdap.cdap.api.annotation.Description;
3435
import io.cdap.cdap.api.annotation.Name;
3536
import io.cdap.cdap.api.annotation.Plugin;
37+
import io.cdap.cdap.api.exception.ErrorCategory;
38+
import io.cdap.cdap.api.exception.ErrorType;
39+
import io.cdap.cdap.api.exception.ErrorUtils;
3640
import io.cdap.cdap.etl.api.action.Action;
3741
import io.cdap.cdap.etl.api.action.ActionContext;
42+
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil;
3843
import io.cdap.plugin.gcp.common.GCPUtils;
3944
import org.slf4j.Logger;
4045
import org.slf4j.LoggerFactory;
@@ -70,34 +75,74 @@ public AbstractBigQueryActionConfig getConfig() {
7075
}
7176

7277
@Override
73-
public void run(ActionContext context) throws Exception {
78+
public void run(ActionContext context) {
7479
config.validate(context.getFailureCollector());
7580

7681
QueryJobConfiguration queryConfig = config.getQueryJobConfiguration(context.getFailureCollector());
7782
JobId jobId = JobId.newBuilder().setRandomJob().build();
7883

7984
// API request - starts the query.
80-
Credentials credentials = config.getServiceAccount() == null ?
81-
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
82-
config.isServiceAccountFilePath());
85+
Credentials credentials = null;
86+
try {
87+
credentials = config.getServiceAccount() == null ? null :
88+
GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
89+
} catch (Exception e) {
90+
context.getFailureCollector().addFailure(
91+
String.format("Failed to load service account credentials, %s: %s",
92+
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
93+
context.getFailureCollector().getOrThrowException();
94+
}
8395
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
8496
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
8597

8698
LOG.info("Executing SQL as job {}.", jobId.getJob());
8799
LOG.debug("The BigQuery SQL {}", queryConfig.getQuery());
88100

89101
// Wait for the query to complete
90-
queryJob.waitFor();
102+
try {
103+
queryJob.waitFor();
104+
} catch (BigQueryException e) {
105+
String errorMessage = String.format("The bigquery query job failed, %s: %s",
106+
e.getClass().getName(), e.getMessage());
107+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, (e).getReason(), e);
108+
} catch (InterruptedException e) {
109+
String errorMessage = String.format("The bigquery query job interrupted, %s: %s",
110+
e.getClass().getName(), e.getMessage());
111+
throw ErrorUtils.getProgramFailureException(
112+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
113+
ErrorType.UNKNOWN, true, e);
114+
}
91115

92116
// Check for errors
93117
if (queryJob.getStatus().getError() != null) {
94-
throw new RuntimeException(queryJob.getStatus().getExecutionErrors().toString());
118+
String errorReason = String.format(
119+
"The bigquery job failed with reason: %s. For more details, see %s",
120+
queryJob.getStatus().getError().getReason(), GCPUtils.BQ_SUPPORTED_DOC_URL);
121+
ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason());
122+
throw ErrorUtils.getProgramFailureException(
123+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
124+
queryJob.getStatus().getExecutionErrors().toString(), type, true, null, null,
125+
GCPUtils.BQ_SUPPORTED_DOC_URL, null);
126+
}
127+
TableResult queryResults;
128+
try {
129+
queryResults = queryJob.getQueryResults();
130+
} catch (BigQueryException e) {
131+
String errorMessage = String.format("The bigquery query job failed, %s: %s",
132+
e.getClass().getName(), e.getMessage());
133+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
134+
} catch (InterruptedException e) {
135+
String errorMessage = String.format("The bigquery query job interrupted, %s: %s",
136+
e.getClass().getName(), e.getMessage());
137+
throw ErrorUtils.getProgramFailureException(
138+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
139+
ErrorType.UNKNOWN, false, e);
95140
}
96-
97-
TableResult queryResults = queryJob.getQueryResults();
98141
if (queryResults.getTotalRows() == 0 || queryResults.getTotalRows() > 1) {
99-
throw new RuntimeException(String.format("The query result total rows should be \"1\" but is \"%d\"",
100-
queryResults.getTotalRows()));
142+
String error = String.format("The query result total rows should be \"1\" but is \"%d\"",
143+
queryResults.getTotalRows());
144+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
145+
error, error, ErrorType.USER, false, null);
101146
}
102147

103148
Schema schema = queryResults.getSchema();

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryArgumentSetterConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import io.cdap.cdap.api.annotation.Description;
2828
import io.cdap.cdap.api.annotation.Macro;
2929
import io.cdap.cdap.api.annotation.Name;
30+
import io.cdap.cdap.api.exception.ErrorCategory;
31+
import io.cdap.cdap.api.exception.ErrorType;
32+
import io.cdap.cdap.api.exception.ErrorUtils;
3033
import io.cdap.cdap.etl.api.FailureCollector;
3134
import io.cdap.plugin.common.ConfigUtil;
3235
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
@@ -224,9 +227,11 @@ private void checkIfArgumentsColumnsExitsInSource(Map<String, String> argumentCo
224227
String nonExistingColumnNames = argumentConditionMap.keySet().stream()
225228
.filter(columnName -> !argumentConditionFields.containsKey(columnName))
226229
.collect(Collectors.joining(" ,"));
227-
throw new RuntimeException(String.format(
230+
String error = String.format(
228231
"Columns: \" %s \"do not exist in table. Argument selections columns must exist in table.",
229-
nonExistingColumnNames));
232+
nonExistingColumnNames);
233+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
234+
error, error, ErrorType.USER, false, null);
230235
}
231236

232237
static void checkIfArgumentsColumnsListExistsInSource(

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,20 @@
4444
import io.cdap.cdap.api.annotation.Macro;
4545
import io.cdap.cdap.api.annotation.Name;
4646
import io.cdap.cdap.api.annotation.Plugin;
47+
import io.cdap.cdap.api.exception.ErrorCategory;
48+
import io.cdap.cdap.api.exception.ErrorCodeType;
49+
import io.cdap.cdap.api.exception.ErrorType;
50+
import io.cdap.cdap.api.exception.ErrorUtils;
4751
import io.cdap.cdap.etl.api.FailureCollector;
4852
import io.cdap.cdap.etl.api.action.Action;
4953
import io.cdap.cdap.etl.api.action.ActionContext;
5054
import io.cdap.cdap.etl.common.Constants;
55+
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorUtil;
5156
import io.cdap.plugin.gcp.bigquery.exception.BigQueryJobExecutionException;
5257
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
5358
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
5459
import io.cdap.plugin.gcp.common.CmekUtils;
60+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
5561
import io.cdap.plugin.gcp.common.GCPUtils;
5662
import org.slf4j.Logger;
5763
import org.slf4j.LoggerFactory;
@@ -93,7 +99,7 @@ public final class BigQueryExecute extends AbstractBigQueryAction {
9399
}
94100

95101
@Override
96-
public void run(ActionContext context) throws Exception {
102+
public void run(ActionContext context) {
97103
FailureCollector collector = context.getFailureCollector();
98104
config.validate(collector, context.getArguments().asMap());
99105
QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(config.getSql());
@@ -125,9 +131,16 @@ public void run(ActionContext context) throws Exception {
125131
builder.setUseLegacySql(config.isLegacySQL());
126132

127133
// API request - starts the query.
128-
Credentials credentials = config.getServiceAccount() == null ?
129-
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
130-
config.isServiceAccountFilePath());
134+
Credentials credentials = null;
135+
try {
136+
credentials = config.getServiceAccount() == null ?
137+
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
138+
config.isServiceAccountFilePath());
139+
} catch (IOException e) {
140+
collector.addFailure(String.format("Failed to load service account credentials, %s: %s",
141+
e.getClass().getName(), e.getMessage()), null).withStacktrace(e.getStackTrace());
142+
collector.getOrThrowException();
143+
}
131144
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, config.getReadTimeout());
132145
//create dataset to store the results if not exists
133146
if (config.getStoreResults() && !Strings.isNullOrEmpty(datasetName) &&
@@ -152,23 +165,46 @@ public void run(ActionContext context) throws Exception {
152165
try {
153166
executeQueryWithExponentialBackoff(bigQuery, queryConfig, context);
154167
} catch (Throwable e) {
155-
throw new RuntimeException(e);
168+
String errorMessage = String.format(
169+
"Failed to execute query with exponential backoff, %s: %s", e.getClass().getName(),
170+
e.getMessage());
171+
if (e instanceof BigQueryException) {
172+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage,
173+
((BigQueryException) e).getReason(), (Exception) e);
174+
}
175+
throw ErrorUtils.getProgramFailureException(
176+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
177+
ErrorType.UNKNOWN, true, e);
156178
}
157179
} else {
158-
executeQuery(bigQuery, queryConfig, context);
180+
try {
181+
executeQuery(bigQuery, queryConfig, context);
182+
} catch (Exception e) {
183+
String errorMessage = String.format("The bigquery query execution failed, %s: %s",
184+
e.getClass().getName(), e.getMessage());
185+
String errorReason = null;
186+
if (e instanceof BigQueryException) {
187+
errorReason = ((BigQueryException) e).getReason();
188+
}
189+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, errorReason, e);
190+
}
159191
}
160192
}
161193

162194
protected void executeQueryWithExponentialBackoff(BigQuery bigQuery,
163-
QueryJobConfiguration queryConfig, ActionContext context)
164-
throws Throwable {
195+
QueryJobConfiguration queryConfig, ActionContext context) {
165196
try {
166197
Failsafe.with(getRetryPolicy()).run(() -> executeQuery(bigQuery, queryConfig, context));
167198
} catch (FailsafeException e) {
199+
String errorReason = String.format("The bigquery query execution failed with message: %s",
200+
e.getMessage());
168201
if (e.getCause() != null) {
169-
throw e.getCause();
202+
errorReason = String.format("The bigquery query execution failed with message: %s",
203+
e.getCause().getMessage());
170204
}
171-
throw e;
205+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
206+
e.getCause() == null ? e : e.getCause(), errorReason, ErrorType.UNKNOWN, true,
207+
GCPUtils.BQ_SUPPORTED_DOC_URL);
172208
}
173209
}
174210

@@ -185,7 +221,7 @@ private RetryPolicy<Object> getRetryPolicy() {
185221
}
186222

187223
private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig, ActionContext context)
188-
throws InterruptedException, BigQueryJobExecutionException {
224+
throws BigQueryJobExecutionException {
189225
// Location must match that of the dataset(s) referenced in the query.
190226
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
191227
Job queryJob;
@@ -199,25 +235,60 @@ private void executeQuery(BigQuery bigQuery, QueryJobConfiguration queryConfig,
199235
// Wait for the query to complete
200236
queryJob = queryJob.waitFor();
201237
} catch (BigQueryException e) {
238+
String errorMessage = String.format("The bigquery query execution failed, %s: %s",
239+
e.getClass().getName(), e.getMessage());
202240
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), e.getError().getMessage());
203241
if (RETRY_ON_REASON.contains(e.getError().getReason())) {
204242
throw new BigQueryJobExecutionException(e.getError().getMessage(), e);
205243
}
206-
throw new RuntimeException(e);
244+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
245+
} catch (InterruptedException e) {
246+
String errorMessage = String.format("The bigquery query execution interrupted, %s: %s",
247+
e.getClass().getName(), e.getMessage());
248+
throw ErrorUtils.getProgramFailureException(
249+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
250+
ErrorType.UNKNOWN, true, e);
207251
}
208252

209253
// Check for errors
210254
if (queryJob.getStatus().getError() != null) {
211255
// You can also look at queryJob.getStatus().getExecutionErrors() for all
212256
// errors, not just the latest one.
213-
LOG.error("The query job {} failed. Error: {}", jobId.getJob(), queryJob.getStatus().getError());
257+
LOG.error("The query job {} failed with reason: {} and error: {}.", jobId.getJob(),
258+
queryJob.getStatus().getError().getReason(),
259+
queryJob.getStatus().getExecutionErrors().toString());
214260
if (RETRY_ON_REASON.contains(queryJob.getStatus().getError().getReason())) {
215261
throw new BigQueryJobExecutionException(queryJob.getStatus().getError().getMessage());
216262
}
217-
throw new RuntimeException(queryJob.getStatus().getError().getMessage());
263+
String errorReason = String.format(
264+
"The bigquery query execution failed due to reason: %s and error: %s. "
265+
+ "For more details, see %s", queryJob.getStatus().getError().getReason(),
266+
queryJob.getStatus().getExecutionErrors().toString(), GCPUtils.BQ_SUPPORTED_DOC_URL);
267+
String errorMessage = String.format(
268+
"The bigquery query execution failed due to reason: %s , error: %s and message: %s",
269+
queryJob.getStatus().getError().getReason(),
270+
queryJob.getStatus().getExecutionErrors().toString(),
271+
queryJob.getStatus().getError().getMessage());
272+
ErrorType type = BigQueryErrorUtil.getErrorType(queryJob.getStatus().getError().getReason());
273+
throw ErrorUtils.getProgramFailureException(
274+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage,
275+
type, true, null, null, GCPUtils.BQ_SUPPORTED_DOC_URL, null);
276+
}
277+
278+
TableResult queryResults;
279+
try {
280+
queryResults = queryJob.getQueryResults();
281+
} catch (BigQueryException e) {
282+
String errorMessage = String.format("Failed to retrieve query result, %s: %s",
283+
e.getClass().getName(), e.getMessage());
284+
throw BigQueryErrorUtil.getProgramFailureException(errorMessage, e.getReason(), e);
285+
} catch (InterruptedException e) {
286+
String errorMessage = String.format("Query result retrieval was interrupted, %s: %s",
287+
e.getClass().getName(), e.getMessage());
288+
throw ErrorUtils.getProgramFailureException(
289+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
290+
ErrorType.UNKNOWN, true, e);
218291
}
219-
220-
TableResult queryResults = queryJob.getQueryResults();
221292
long rows = queryResults.getTotalRows();
222293

223294
if (config.shouldSetAsArguments()) {
@@ -659,11 +730,12 @@ public void validateSQLSyntax(FailureCollector failureCollector, BigQuery bigQue
659730
bigQuery.create(JobInfo.of(queryJobConfiguration));
660731
} catch (BigQueryException e) {
661732
final String errorMessage;
662-
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
663-
errorMessage = String.format("Resource was not found. Please verify the resource name. If the resource " +
664-
"will be created at runtime, then update to use a macro for the resource name. Error message received " +
665-
"was: %s", e.getMessage());
666-
} else {
733+
if (e.getCode() == ERROR_CODE_NOT_FOUND) {
734+
errorMessage = String.format(
735+
"Resource was not found. Please verify the resource name. If the resource will be "
736+
+ "created at runtime, then update to use a macro for the resource name. "
737+
+ "Error message received was %s: %s", e.getClass().getName(), e.getMessage());
738+
} else {
667739
errorMessage = e.getMessage();
668740
}
669741
failureCollector.addFailure(String.format("%s. Error code: %s.", errorMessage, e.getCode()),

0 commit comments

Comments
 (0)