|
39 | 39 | import io.cdap.cdap.etl.api.batch.BatchSink; |
40 | 40 | import io.cdap.cdap.etl.api.batch.BatchSinkContext; |
41 | 41 | import io.cdap.cdap.etl.api.connector.Connector; |
| 42 | +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; |
42 | 43 | import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; |
43 | 44 | import io.cdap.plugin.common.Asset; |
44 | 45 | import io.cdap.plugin.common.ConfigUtil; |
|
51 | 52 | import io.cdap.plugin.format.plugin.FileSinkProperties; |
52 | 53 | import io.cdap.plugin.gcp.common.CmekUtils; |
53 | 54 | import io.cdap.plugin.gcp.common.GCPConnectorConfig; |
| 55 | +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; |
54 | 56 | import io.cdap.plugin.gcp.common.GCPUtils; |
55 | 57 | import io.cdap.plugin.gcp.gcs.Formats; |
56 | 58 | import io.cdap.plugin.gcp.gcs.GCSPath; |
@@ -121,30 +123,50 @@ public void prepareRun(BatchSinkContext context) throws Exception { |
121 | 123 | collector.addFailure("Service account type is undefined.", |
122 | 124 | "Must be `filePath` or `JSON`"); |
123 | 125 | collector.getOrThrowException(); |
124 | | - return; |
125 | 126 | } |
126 | | - Credentials credentials = config.connection.getServiceAccount() == null ? |
127 | | - null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath); |
| 127 | + |
| 128 | + Credentials credentials = null; |
| 129 | + try { |
| 130 | + credentials = config.connection.getServiceAccount() == null ? |
| 131 | + null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), |
| 132 | + isServiceAccountFilePath); |
| 133 | + } catch (Exception e) { |
| 134 | + String errorReason = "Unable to load service account credentials."; |
| 135 | + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null) |
| 136 | + .withStacktrace(e.getStackTrace()); |
| 137 | + collector.getOrThrowException(); |
| 138 | + } |
| 139 | + |
| 140 | + String bucketName = config.getBucket(collector); |
128 | 141 | Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials); |
| 142 | + String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket."; |
| 143 | + String correctiveAction = "Ensure you entered the correct bucket path and " |
| 144 | + + "have permissions for it."; |
129 | 145 | Bucket bucket; |
130 | | - String location; |
| 146 | + String location = null; |
131 | 147 | try { |
132 | | - bucket = storage.get(config.getBucket()); |
| 148 | + bucket = storage.get(bucketName); |
| 149 | + if (bucket != null) { |
| 150 | + location = bucket.getLocation(); |
| 151 | + } else { |
| 152 | + location = config.getLocation(); |
| 153 | + GCPUtils.createBucket(storage, bucketName, location, cmekKeyName); |
| 154 | + } |
133 | 155 | } catch (StorageException e) { |
134 | | - throw new RuntimeException( |
135 | | - String.format("Unable to access or create bucket %s. ", config.getBucket()) |
136 | | - + "Ensure you entered the correct bucket path and have permissions for it.", e); |
137 | | - } |
138 | | - if (bucket != null) { |
139 | | - location = bucket.getLocation(); |
140 | | - } else { |
141 | | - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName); |
142 | | - location = config.getLocation(); |
| 156 | + String errorReason = String.format(errorReasonFormat, e.getCode()); |
| 157 | + collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction) |
| 158 | + .withStacktrace(e.getStackTrace()); |
| 159 | + collector.getOrThrowException(); |
143 | 160 | } |
| 161 | + |
144 | 162 | this.outputPath = getOutputDir(context); |
145 | 163 | // create asset for lineage |
146 | 164 | asset = Asset.builder(config.getReferenceName()) |
147 | 165 | .setFqn(GCSPath.getFQN(config.getPath())).setLocation(location).build(); |
| 166 | + |
| 167 | + // set error details provider |
| 168 | + context.setErrorDetailsProvider( |
| 169 | + new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName())); |
148 | 170 |
|
149 | 171 | // super is called down here to avoid instantiating the lineage recorder with a null asset |
150 | 172 | super.prepareRun(context); |
@@ -532,8 +554,20 @@ public void validateContentType(FailureCollector failureCollector) { |
532 | 554 | } |
533 | 555 | } |
534 | 556 |
|
535 | | - public String getBucket() { |
536 | | - return GCSPath.from(path).getBucket(); |
| 557 | + /** |
| 558 | + * Get the bucket name from the path. |
| 559 | + * @param collector failure collector |
| 560 | + * @return bucket name as {@link String} if found, otherwise null. |
| 561 | + */ |
| 562 | + public String getBucket(FailureCollector collector) { |
| 563 | + try { |
| 564 | + return GCSPath.from(path).getBucket(); |
| 565 | + } catch (IllegalArgumentException e) { |
| 566 | + collector.addFailure(e.getMessage(), null) |
| 567 | + .withStacktrace(e.getStackTrace()); |
| 568 | + collector.getOrThrowException(); |
| 569 | + } |
| 570 | + return null; |
537 | 571 | } |
538 | 572 |
|
539 | 573 | @Override |
@@ -718,8 +752,8 @@ public Builder setCustomContentType(@Nullable String customContentType) { |
718 | 752 | return this; |
719 | 753 | } |
720 | 754 |
|
721 | | - public GCSBatchSink.GCSBatchSinkConfig build() { |
722 | | - return new GCSBatchSink.GCSBatchSinkConfig( |
| 755 | + public GCSBatchSinkConfig build() { |
| 756 | + return new GCSBatchSinkConfig( |
723 | 757 | referenceName, |
724 | 758 | project, |
725 | 759 | fileSystemProperties, |
|
0 commit comments