Skip to content

Commit 5e03241

Browse files
author
psainics
committed
Add DatastoreErrorDetailsProvider
1 parent edec557 commit 5e03241

6 files changed

Lines changed: 112 additions & 34 deletions

File tree

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class GCPUtils {
8888
public static final String BQ_SUPPORTED_DOC_URL = "https://cloud.google.com/bigquery/docs/error-messages";
8989
public static final String PUBSUB_SUPPORTED_DOC_URL = "https://cloud.google.com/pubsub/docs/reference/error-codes";
9090
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
91+
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
9192

9293
/**
9394
* Load a service account from the local file system.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.datastore.common;
18+
19+
import com.google.common.base.Throwables;
20+
import com.google.datastore.v1.client.DatastoreException;
21+
import com.google.rpc.Code;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorCodeType;
24+
import io.cdap.cdap.api.exception.ErrorType;
25+
import io.cdap.cdap.api.exception.ErrorUtils;
26+
import io.cdap.cdap.api.exception.ProgramFailureException;
27+
import io.cdap.cdap.etl.api.exception.ErrorContext;
28+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
29+
import io.cdap.plugin.gcp.common.GCPUtils;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/**
35+
* A custom ErrorDetailsProvider for Datastore plugins.
36+
*/
37+
public class DatastoreErrorDetailsProvider extends GCPErrorDetailsProvider {
38+
39+
static Map<Code, ErrorUtils.ActionErrorPair> actionErrorMap = new HashMap<>();
40+
41+
static {
42+
actionErrorMap.put(Code.CANCELLED, ErrorUtils.getActionErrorByStatusCode(499));
43+
actionErrorMap.put(Code.UNKNOWN, ErrorUtils.getActionErrorByStatusCode(500));
44+
actionErrorMap.put(Code.INVALID_ARGUMENT, ErrorUtils.getActionErrorByStatusCode(400));
45+
actionErrorMap.put(Code.DEADLINE_EXCEEDED, ErrorUtils.getActionErrorByStatusCode(504));
46+
actionErrorMap.put(Code.NOT_FOUND, ErrorUtils.getActionErrorByStatusCode(404));
47+
actionErrorMap.put(Code.ALREADY_EXISTS, ErrorUtils.getActionErrorByStatusCode(409));
48+
actionErrorMap.put(Code.PERMISSION_DENIED, ErrorUtils.getActionErrorByStatusCode(403));
49+
actionErrorMap.put(Code.UNAUTHENTICATED, ErrorUtils.getActionErrorByStatusCode(401));
50+
actionErrorMap.put(Code.RESOURCE_EXHAUSTED, ErrorUtils.getActionErrorByStatusCode(429));
51+
actionErrorMap.put(Code.FAILED_PRECONDITION, ErrorUtils.getActionErrorByStatusCode(400));
52+
actionErrorMap.put(Code.ABORTED, ErrorUtils.getActionErrorByStatusCode(409));
53+
actionErrorMap.put(Code.OUT_OF_RANGE, ErrorUtils.getActionErrorByStatusCode(400));
54+
actionErrorMap.put(Code.UNIMPLEMENTED, ErrorUtils.getActionErrorByStatusCode(501));
55+
actionErrorMap.put(Code.INTERNAL, ErrorUtils.getActionErrorByStatusCode(500));
56+
actionErrorMap.put(Code.UNAVAILABLE, ErrorUtils.getActionErrorByStatusCode(503));
57+
actionErrorMap.put(Code.DATA_LOSS, ErrorUtils.getActionErrorByStatusCode(500));
58+
}
59+
60+
@Override
61+
protected String getExternalDocumentationLink() {
62+
return GCPUtils.DATASTORE_SUPPORTED_DOC_URL;
63+
}
64+
65+
@Override
66+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
67+
ProgramFailureException ex = super.getExceptionDetails(e, errorContext);
68+
if (ex != null) {
69+
return ex;
70+
}
71+
List<Throwable> causalChain = Throwables.getCausalChain(e);
72+
for (Throwable t : causalChain) {
73+
if (t instanceof DatastoreException) {
74+
return getProgramFailureExceptionFromDatastoreException((DatastoreException) t);
75+
}
76+
}
77+
return null;
78+
}
79+
80+
private ProgramFailureException getProgramFailureExceptionFromDatastoreException(DatastoreException de) {
81+
String errorCodeName = de.getCode().name();
82+
ErrorUtils.ActionErrorPair actionErrorPair = null;
83+
String errorReason = de.getMessage();
84+
String errorMessage = de.getMessage();
85+
if (actionErrorMap.containsKey(de.getCode())) {
86+
actionErrorPair = actionErrorMap.get(de.getCode());
87+
errorReason = String.format("%s %s. %s", errorCodeName, errorMessage, actionErrorPair.getCorrectiveAction());
88+
}
89+
if (!errorReason.endsWith(".")) {
90+
errorReason = errorReason + ".";
91+
}
92+
errorReason = String.format("%s For more details, see %s", errorReason, GCPUtils.DATASTORE_SUPPORTED_DOC_URL);
93+
94+
String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", errorCodeName, errorMessage);
95+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
96+
errorReason, String.format("%s: %s", de.getClass().getName(), errorMessageWithCode),
97+
actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP,
98+
errorCodeName, GCPUtils.DATASTORE_SUPPORTED_DOC_URL, de);
99+
}
100+
}

src/main/java/io/cdap/plugin/gcp/datastore/exception/DatastoreExecutionException.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

src/main/java/io/cdap/plugin/gcp/datastore/sink/DatastoreSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3232
import io.cdap.cdap.etl.api.batch.BatchSink;
3333
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
34+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3435
import io.cdap.plugin.common.LineageRecorder;
36+
import io.cdap.plugin.gcp.datastore.common.DatastoreErrorDetailsProvider;
3537
import org.apache.hadoop.io.NullWritable;
3638
import org.slf4j.Logger;
3739
import org.slf4j.LoggerFactory;
@@ -85,6 +87,8 @@ public void prepareRun(BatchSinkContext context) {
8587
String batchSize = Integer.toString(config.getBatchSize());
8688
String shouldUseTransactions = Boolean.toString(config.shouldUseTransactions());
8789

90+
// set error details provider
91+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(DatastoreErrorDetailsProvider.class.getName()));
8892
context.addOutput(Output.of(config.getReferenceName(),
8993
new DatastoreOutputFormatProvider(project, serviceAccount,
9094
config.isServiceAccountFilePath(),

src/main/java/io/cdap/plugin/gcp/datastore/source/DatastoreInputFormat.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.datastore.v1.client.DatastoreHelper;
2424
import com.google.datastore.v1.client.QuerySplitter;
2525
import com.google.protobuf.TextFormat;
26-
import io.cdap.plugin.gcp.datastore.exception.DatastoreExecutionException;
2726
import io.cdap.plugin.gcp.datastore.source.util.DatastoreSourceConstants;
2827
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
2928
import org.apache.hadoop.conf.Configuration;
@@ -75,7 +74,7 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
7574
.map(QueryInputSplit::new)
7675
.collect(Collectors.toList());
7776
} catch (DatastoreException e) {
78-
throw new DatastoreExecutionException("Unable to split the query: " + query, e);
77+
throw new IOException("Unable to split the query: " + query, e);
7978
}
8079
}
8180

src/main/java/io/cdap/plugin/gcp/datastore/source/DatastoreSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@
4343
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
4444
import io.cdap.cdap.etl.api.batch.BatchSource;
4545
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
46+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
4647
import io.cdap.plugin.common.LineageRecorder;
48+
import io.cdap.plugin.gcp.datastore.common.DatastoreErrorDetailsProvider;
4749
import io.cdap.plugin.gcp.datastore.source.util.DatastoreSourceConstants;
4850
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
51+
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
4952
import org.apache.hadoop.io.NullWritable;
5053
import org.slf4j.Logger;
5154
import org.slf4j.LoggerFactory;
@@ -129,6 +132,9 @@ public void prepareRun(BatchSourceContext batchSourceContext) {
129132
String pbQuery = config.constructPbQuery(collector).toString();
130133
String splits = String.valueOf(config.getNumSplits());
131134

135+
// set error details provider
136+
batchSourceContext.setErrorDetailsProvider(
137+
new ErrorDetailsProviderSpec(DatastoreErrorDetailsProvider.class.getName()));
132138
batchSourceContext.setInput(
133139
Input.of(config.getReferenceName(),
134140
new DatastoreInputFormatProvider(project, serviceAccount, config.isServiceAccountFilePath(), namespace,

0 commit comments

Comments
 (0)