Skip to content

Commit 24f640b

Browse files
author
psainics
committed
Enhance error handling in DelegatingMultiSinkOutputCommitter by adding logging for task and job commit failures
1 parent e5a08fc commit 24f640b

1 file changed

Lines changed: 27 additions & 6 deletions

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.hadoop.mapreduce.JobStatus;
2424
import org.apache.hadoop.mapreduce.OutputCommitter;
2525
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
import java.io.IOException;
2830
import java.util.HashMap;
@@ -35,6 +37,7 @@
3537
* Delegated instances are supplied along with a schema, which is used to configure the commit operation.
3638
*/
3739
public class DelegatingMultiSinkOutputCommitter extends OutputCommitter {
40+
private static final Logger LOG = LoggerFactory.getLogger(DelegatingMultiSinkOutputCommitter.class);
3841
private final Map<String, OutputCommitter> committerMap;
3942
private final Map<String, Schema> schemaMap;
4043
private final String projectName;
@@ -99,18 +102,28 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOE
99102
@Override
100103
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
101104
for (String tableName : committerMap.keySet()) {
102-
configureContext(taskAttemptContext, tableName);
103-
104-
committerMap.get(tableName).commitTask(taskAttemptContext);
105+
try {
106+
configureContext(taskAttemptContext, tableName);
107+
committerMap.get(tableName).commitTask(taskAttemptContext);
108+
} catch (IOException e) {
109+
LOG.warn("BigQuery multi-sink table '{}' failed during task commit. Reason: {}",
110+
tableName, getFailureReason(e), e);
111+
throw e;
112+
}
105113
}
106114
}
107115

108116
@Override
109117
public void commitJob(JobContext jobContext) throws IOException {
110118
for (String tableName : committerMap.keySet()) {
111-
configureContext(jobContext, tableName);
112-
113-
committerMap.get(tableName).commitJob(jobContext);
119+
try {
120+
configureContext(jobContext, tableName);
121+
committerMap.get(tableName).commitJob(jobContext);
122+
} catch (IOException e) {
123+
LOG.warn("BigQuery multi-sink table '{}' failed during job commit. Reason: {}",
124+
tableName, getFailureReason(e), e);
125+
throw e;
126+
}
114127
}
115128
}
116129

@@ -168,4 +181,12 @@ public void configureContext(JobContext context, String tableName) throws IOExce
168181
gcsPath,
169182
fields);
170183
}
184+
185+
private String getFailureReason(IOException exception) {
186+
Throwable rootCause = exception;
187+
while (rootCause.getCause() != null) {
188+
rootCause = rootCause.getCause();
189+
}
190+
return rootCause.getMessage() == null ? exception.getMessage() : rootCause.getMessage();
191+
}
171192
}

0 commit comments

Comments
 (0)