Skip to content

Commit d42111c

Browse files
authored
Merge pull request #37748: Where a Throwable is being caught, take care to rethrow OutOfMemoryError
1 parent 38b416c commit d42111c

27 files changed

Lines changed: 65 additions & 22 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,8 @@ public void startBundle() {
176176
// This can contain user code. Wrap it in case it throws an exception.
177177
try {
178178
invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider());
179-
} catch (Throwable t) {
180-
// Exception in user code.
181-
throw wrapUserCodeException(t);
179+
} catch (Exception e) {
180+
throw wrapUserCodeException(e);
182181
}
183182
}
184183

@@ -215,8 +214,8 @@ private void invokeProcessElement(WindowedValue<InputT> elem) {
215214
// This can contain user code. Wrap it in case it throws an exception.
216215
try {
217216
invoker.invokeProcessElement(new DoFnProcessContext(elem));
218-
} catch (Exception ex) {
219-
throw wrapUserCodeException(ex);
217+
} catch (Exception e) {
218+
throw wrapUserCodeException(e);
220219
}
221220
}
222221

@@ -225,9 +224,8 @@ public void finishBundle() {
225224
// This can contain user code. Wrap it in case it throws an exception.
226225
try {
227226
invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider());
228-
} catch (Throwable t) {
229-
// Exception in user code.
230-
throw wrapUserCodeException(t);
227+
} catch (Exception e) {
228+
throw wrapUserCodeException(e);
231229
}
232230
}
233231

@@ -237,8 +235,8 @@ public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, K
237235
new OnWindowExpirationArgumentProvider<>(window, timestamp, key));
238236
}
239237

240-
private RuntimeException wrapUserCodeException(Throwable t) {
241-
throw UserCodeException.wrapIf(!isSystemDoFn(), t);
238+
private RuntimeException wrapUserCodeException(Exception e) {
239+
throw UserCodeException.wrapIf(!isSystemDoFn(), e);
242240
}
243241

244242
private boolean isSystemDoFn() {

runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,9 @@ public DirectPipelineResult run(Pipeline pipeline) {
223223
result.waitUntilFinish();
224224
} catch (UserCodeException userException) {
225225
throw new PipelineExecutionException(userException.getCause());
226+
} catch (RuntimeException | OutOfMemoryError e) {
227+
throw e;
226228
} catch (Throwable t) {
227-
if (t instanceof RuntimeException) {
228-
throw (RuntimeException) t;
229-
}
230229
throw new RuntimeException(t);
231230
}
232231
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,11 +276,11 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr
276276
executeWork(worker, progressUpdater);
277277
workItemStatusClient.reportSuccess();
278278
return true;
279-
279+
} catch (OutOfMemoryError oom) {
280+
throw oom;
280281
} catch (Throwable e) {
281282
workItemStatusClient.reportError(e);
282283
return false;
283-
284284
} finally {
285285
if (worker != null) {
286286
try {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,9 @@ private void invokeProcessElement(WindowedValue<InputT> elem) {
9292
// This can contain user code. Wrap it in case it throws an exception.
9393
try {
9494
fn.processElement(elem.getValue(), options, stepContext, sideInputReader, outputManager);
95+
} catch (RuntimeException ex) {
96+
throw ex;
9597
} catch (Exception ex) {
96-
if (ex instanceof RuntimeException) {
97-
throw (RuntimeException) ex;
98-
}
99-
10098
throw new RuntimeException(ex);
10199
}
102100
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ private DoFnRunner<InputT, KV<K, Iterable<V>>> createRunner() {
172172
output -> {
173173
try {
174174
receiver.process(output);
175+
} catch (OutOfMemoryError oom) {
176+
throw oom;
175177
} catch (Throwable t) {
176178
throw new RuntimeException(t);
177179
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,8 @@ static ApproximateReportedProgress getReaderProgress(BoundedSource.BoundedReader
680680
if (fractionConsumed != null) {
681681
progress.setFractionConsumed(fractionConsumed);
682682
}
683+
} catch (OutOfMemoryError oom) {
684+
throw oom;
683685
} catch (Throwable t) {
684686
LOG.warn("Error estimating fraction consumed from reader {}", reader, t);
685687
}
@@ -689,6 +691,8 @@ static ApproximateReportedProgress getReaderProgress(BoundedSource.BoundedReader
689691
if (parallelism != null) {
690692
progress.setConsumedParallelism(parallelism);
691693
}
694+
} catch (OutOfMemoryError oom) {
695+
throw oom;
692696
} catch (Throwable t) {
693697
LOG.warn("Error estimating consumed parallelism from reader {}", reader, t);
694698
}
@@ -698,6 +702,8 @@ static ApproximateReportedProgress getReaderProgress(BoundedSource.BoundedReader
698702
if (parallelism != null) {
699703
progress.setRemainingParallelism(parallelism);
700704
}
705+
} catch (OutOfMemoryError oom) {
706+
throw oom;
701707
} catch (Throwable t) {
702708
LOG.warn("Error estimating remaining parallelism from reader {}", reader, t);
703709
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private void reportFailure(String message, Exception e, int code) {
377377
if (manager != null) {
378378
manager.error(message, e, code);
379379
}
380-
} catch (Throwable t) {
380+
} catch (Exception ex) {
381381
// Failed to report logging failure. No meaningful action left.
382382
}
383383
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class JfrInterop {
5959
recordingClose = jfrClass.getMethod("close");
6060

6161
jfrConfig = configClass.getMethod("getConfiguration", String.class).invoke(null, "profile");
62+
} catch (OutOfMemoryError oom) {
63+
throw oom;
6264
} catch (Throwable t) {
6365
throw new RuntimeException(t);
6466
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ private void updateData(long now, long lastTimeWokeUp) {
529529

530530
// Clearing this list should "release" some memory that will be needed to dump the heap.
531531
// We could try to reallocate it again if we later notice memory pressure has subsided,
532-
// but that is risk. Further, leaving this released may help with the memory pressure.
532+
// but that is risky. Further, leaving this released may help with the memory pressure.
533533
reservedForDumpingHeap = null;
534534

535535
try {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void fillEntries() {
129129
nextStartPosition = batch.nextStartPosition;
130130
entries = batch.entries.listIterator();
131131
currentBatch = batch;
132-
} catch (RuntimeException e) {
132+
} catch (RuntimeException | OutOfMemoryError e) {
133133
throw e;
134134
} catch (Throwable t) {
135135
throw new RuntimeException(t);

0 commit comments

Comments
 (0)