Skip to content

Commit e5db1b3

Browse files
authored
[Dataflow Java Streaming] Reset state using finally blocks instead of catching Exception, in cases where it may otherwise corrupt datastructures if an OutOfMemoryError is thrown. (#37746)
1 parent c9ab4b6 commit e5db1b3

3 files changed

Lines changed: 61 additions & 46 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.runners.dataflow.worker;
1919

2020
import static org.apache.beam.runners.dataflow.util.Structs.getString;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122

2223
import com.google.auto.service.AutoService;
2324
import java.io.IOException;
@@ -51,16 +52,12 @@
5152
import org.slf4j.Logger;
5253
import org.slf4j.LoggerFactory;
5354

54-
@SuppressWarnings({
55-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
56-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
57-
})
5855
class WindmillSink<T> extends Sink<WindowedValue<T>> {
5956

60-
private WindmillStreamWriter writer;
57+
private final WindmillStreamWriter writer;
6158
private final Coder<T> valueCoder;
6259
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
63-
private StreamingModeExecutionContext context;
60+
private final StreamingModeExecutionContext context;
6461
private static final Logger LOG = LoggerFactory.getLogger(WindmillSink.class);
6562

6663
WindmillSink(
@@ -81,6 +78,7 @@ private static ByteString encodeMetadata(
8178
PaneInfo paneInfo,
8279
BeamFnApi.Elements.ElementMetadata metadata)
8380
throws IOException {
81+
boolean resetNeeded = true;
8482
try {
8583
// element metadata is behind the experiment
8684
boolean elementMetadata = WindowedValues.WindowedValueCoder.isMetadataSupported();
@@ -92,10 +90,13 @@ private static ByteString encodeMetadata(
9290
PaneInfoCoder.INSTANCE.encode(paneInfo, stream);
9391
windowsCoder.encode(windows, stream, Coder.Context.OUTER);
9492
}
95-
return stream.toByteStringAndReset();
96-
} catch (Exception e) {
97-
stream.reset();
98-
throw e;
93+
ByteString result = stream.toByteStringAndReset();
94+
resetNeeded = false;
95+
return result;
96+
} finally {
97+
if (resetNeeded) {
98+
stream.reset();
99+
}
99100
}
100101
}
101102

@@ -150,6 +151,7 @@ public Map<String, SinkFactory> factories() {
150151
}
151152
}
152153

154+
@SuppressWarnings("rawtypes")
153155
public static class Factory implements SinkFactory {
154156

155157
@Override
@@ -166,7 +168,7 @@ public WindmillSink<?> create(
166168
return new WindmillSink<>(
167169
getString(spec, "stream_id"),
168170
typedCoder,
169-
(StreamingModeExecutionContext) executionContext);
171+
checkNotNull((StreamingModeExecutionContext) executionContext));
170172
}
171173
}
172174

@@ -198,17 +200,21 @@ private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws
198200
throw new IllegalStateException(
199201
"Expected output stream to be empty but had " + stream.toByteString());
200202
}
203+
boolean resetNeeded = true;
201204
try {
202205
coder.encode(object, stream, Coder.Context.OUTER);
203-
return stream.toByteStringAndReset();
204-
} catch (Exception e) {
205-
stream.reset();
206-
throw e;
206+
ByteString result = stream.toByteStringAndReset();
207+
resetNeeded = false;
208+
return result;
209+
} finally {
210+
if (resetNeeded) {
211+
stream.reset();
212+
}
207213
}
208214
}
209215

210216
@Override
211-
@SuppressWarnings("NestedInstanceOfConditions")
217+
@SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"})
212218
public long add(WindowedValue<T> data) throws IOException {
213219
ByteString key, value;
214220
ByteString id = ByteString.EMPTY;
@@ -220,21 +226,21 @@ public long add(WindowedValue<T> data) throws IOException {
220226
stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata);
221227
if (valueCoder instanceof KvCoder) {
222228
KvCoder kvCoder = (KvCoder) valueCoder;
223-
KV kv = (KV) data.getValue();
229+
KV kv = checkNotNull((KV) data.getValue());
224230
key = encode(kvCoder.getKeyCoder(), kv.getKey());
225231
Coder valueCoder = kvCoder.getValueCoder();
226232
// If ids are explicitly provided, use that instead of the windmill-generated id.
227233
// This is used when reading an UnboundedSource to deduplicate records.
228234
if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
229-
ValueWithRecordId valueAndId = (ValueWithRecordId) kv.getValue();
235+
ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId) kv.getValue());
230236
value =
231237
encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(), valueAndId.getValue());
232238
id = ByteString.copyFrom(valueAndId.getId());
233239
} else {
234240
value = encode(valueCoder, kv.getValue());
235241
}
236242
} else {
237-
key = context.getSerializedKey();
243+
key = checkNotNull(context.getSerializedKey());
238244
value = encode(valueCoder, data.getValue());
239245
}
240246
if (key.size() > context.getMaxOutputKeyBytes()) {
@@ -291,8 +297,9 @@ public long add(WindowedValue<T> data) throws IOException {
291297
}
292298
byte[] rawId = null;
293299

294-
if (data.getRecordId() != null) {
295-
rawId = data.getRecordId().getBytes(StandardCharsets.UTF_8);
300+
@Nullable String recordId = data.getRecordId();
301+
if (recordId != null) {
302+
rawId = recordId.getBytes(StandardCharsets.UTF_8);
296303
} else {
297304
rawId = context.getCurrentRecordId();
298305
}
@@ -303,8 +310,9 @@ public long add(WindowedValue<T> data) throws IOException {
303310
id = ByteString.copyFrom(rawId);
304311

305312
byte[] rawOffset = null;
306-
if (data.getRecordOffset() != null) {
307-
rawOffset = Longs.toByteArray(data.getRecordOffset());
313+
@Nullable Long recordOffset = data.getRecordOffset();
314+
if (recordOffset != null) {
315+
rawOffset = Longs.toByteArray(recordOffset);
308316
} else {
309317
rawOffset = context.getCurrentRecordOffset();
310318
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -268,29 +268,36 @@ private void startStream() {
268268
debugMetrics.recordStart();
269269
streamHandler.streamDebugMetrics.recordStart();
270270
currentPhysicalStream = streamHandler;
271-
currentPhysicalStreamForDebug.set(currentPhysicalStream);
272-
requestObserver.reset(physicalStreamFactory.apply(new ResponseObserver(streamHandler)));
273-
onFlushPending(true);
274-
if (clientClosed) {
275-
// The logical stream is half-closed so after flushing the remaining requests close the
276-
// physical stream.
277-
streamHandler.streamDebugMetrics.recordHalfClose();
278-
requestObserver.onCompleted();
279-
} else if (!halfClosePhysicalStreamAfter.isZero()) {
280-
halfCloseFuture =
281-
executor.schedule(
282-
() -> onHalfClosePhysicalStreamTimeout(streamHandler),
283-
halfClosePhysicalStreamAfter.getSeconds(),
284-
TimeUnit.SECONDS);
271+
boolean resetCurrentPhysicalStream = true;
272+
try {
273+
currentPhysicalStreamForDebug.set(currentPhysicalStream);
274+
requestObserver.reset(physicalStreamFactory.apply(new ResponseObserver(streamHandler)));
275+
onFlushPending(true);
276+
if (clientClosed) {
277+
// The logical stream is half-closed so after flushing the remaining requests close
278+
// the
279+
// physical stream.
280+
streamHandler.streamDebugMetrics.recordHalfClose();
281+
requestObserver.onCompleted();
282+
} else if (!halfClosePhysicalStreamAfter.isZero()) {
283+
halfCloseFuture =
284+
executor.schedule(
285+
() -> onHalfClosePhysicalStreamTimeout(streamHandler),
286+
halfClosePhysicalStreamAfter.getSeconds(),
287+
TimeUnit.SECONDS);
288+
}
289+
resetCurrentPhysicalStream = false;
290+
} finally {
291+
if (resetCurrentPhysicalStream) {
292+
clearCurrentPhysicalStream(true);
293+
}
285294
}
286295
return;
287296
} catch (WindmillStreamShutdownException e) {
288297
logger.debug("Stream was shutdown while creating new stream.", e);
289-
clearCurrentPhysicalStream(true);
290298
break;
291299
} catch (Exception e) {
292300
logger.error("Failed to create new stream, retrying: ", e);
293-
clearCurrentPhysicalStream(true);
294301
debugMetrics.recordRestartReason("Failed to create new stream, retrying: " + e);
295302
}
296303
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,11 @@ protected synchronized void onFlushPending(boolean isNewStream)
289289
// Notify all waiters with requests in this batch as well as the sender
290290
// of the next batch (if one exists).
291291
batch.notifySent();
292-
} catch (Exception e) {
293-
LOG.debug("Batch failed to send on new stream", e);
292+
} catch (Throwable t) {
294293
// Free waiters if the send() failed.
295294
batch.notifyFailed();
296-
throw e;
295+
LOG.debug("Batch failed to send on new stream", t);
296+
throw t;
297297
}
298298
}
299299
}
@@ -535,12 +535,12 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
535535
// Notify all waiters with requests in this batch as well as the sender
536536
// of the next batch (if one exists).
537537
batch.notifySent();
538-
} catch (Exception e) {
539-
LOG.debug("Batch failed to send", e);
538+
} catch (Throwable t) {
540539
// Free waiters if the send() failed.
541540
batch.notifyFailed();
542-
// Propagate the exception to the calling thread.
543-
throw e;
541+
LOG.debug("Batch failed to send", t);
542+
// Propagate the exception/error to the calling thread.
543+
throw t;
544544
}
545545
}
546546

0 commit comments

Comments
 (0)