Skip to content

Commit fa9d190

Browse files
authored
SDF draining in dataflow runner v1 (#37831)
1 parent 1007f73 commit fa9d190

6 files changed

Lines changed: 93 additions & 24 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
197197
processContext, OutputBuilderSuppliers.supplierForElement(element));
198198
}
199199

200+
@Override
201+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
202+
return processContext.causedByDrain();
203+
}
204+
200205
@Override
201206
public RestrictionTracker<?, ?> restrictionTracker() {
202207
return processContext.tracker;

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,19 @@ public String getErrorContext() {
462462
elementState.readLater();
463463
restrictionState.readLater();
464464
watermarkEstimatorState.readLater();
465-
elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
465+
WindowedValue<InputT> read = elementState.read();
466+
if (timer.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
467+
read =
468+
WindowedValues.of(
469+
read.getValue(),
470+
read.getTimestamp(),
471+
read.getWindows(),
472+
read.getPaneInfo(),
473+
read.getRecordId(),
474+
read.getRecordOffset(),
475+
CausedByDrain.CAUSED_BY_DRAIN);
476+
}
477+
elementAndRestriction = KV.of(read, restrictionState.read());
466478
watermarkEstimatorStateT = watermarkEstimatorState.read();
467479
}
468480

@@ -574,7 +586,7 @@ public String getErrorContext() {
574586
result =
575587
processElementInvoker.invokeProcessElement(
576588
invoker,
577-
elementAndRestriction.getKey(),
589+
elementAndRestriction.getKey(), // windowed value
578590
tracker,
579591
watermarkEstimator,
580592
sideInputMapping);
@@ -598,15 +610,18 @@ public String getErrorContext() {
598610
Instant wakeupTime =
599611
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
600612
holdState.add(futureOutputWatermark);
601-
// Set a timer to continue processing this element.
602-
// todo radoslws@ decide if draining should be set on timer
603-
timerInternals.setTimer(
604-
TimerInternals.TimerData.of(
605-
stateNamespace,
606-
wakeupTime,
607-
wakeupTime,
608-
TimeDomain.PROCESSING_TIME,
609-
timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
613+
// Set a timer to continue processing this element, but only when no drain
614+
if (timer == null || timer.causedByDrain() == CausedByDrain.NORMAL) {
615+
timerInternals.setTimer(
616+
TimerInternals.TimerData.of(
617+
stateNamespace,
618+
wakeupTime,
619+
wakeupTime,
620+
TimeDomain.PROCESSING_TIME,
621+
CausedByDrain.NORMAL));
622+
} else {
623+
holdState.clear();
624+
}
610625
}
611626

612627
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(

runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.sdk.coders.SerializableCoder;
4646
import org.apache.beam.sdk.coders.VoidCoder;
4747
import org.apache.beam.sdk.io.range.OffsetRange;
48+
import org.apache.beam.sdk.state.TimeDomain;
4849
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
4950
import org.apache.beam.sdk.testing.TestPipeline;
5051
import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@
6162
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
6263
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
6364
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
65+
import org.apache.beam.sdk.values.CausedByDrain;
6466
import org.apache.beam.sdk.values.KV;
6567
import org.apache.beam.sdk.values.PCollectionView;
6668
import org.apache.beam.sdk.values.TimestampedValue;
@@ -223,6 +225,22 @@ void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) throws
223225
"key".getBytes(StandardCharsets.UTF_8), Collections.singletonList(windowedValue)));
224226
}
225227

228+
boolean advanceDrain() throws Exception {
229+
230+
List<TimerInternals.TimerData> timers = new ArrayList<>();
231+
timers.add(
232+
TimerInternals.TimerData.of(
233+
"",
234+
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
235+
new Instant(Long.MAX_VALUE),
236+
new Instant(Long.MAX_VALUE),
237+
TimeDomain.EVENT_TIME,
238+
CausedByDrain.CAUSED_BY_DRAIN));
239+
tester.processElement(
240+
KeyedWorkItems.timersWorkItem("key".getBytes(StandardCharsets.UTF_8), timers));
241+
return true;
242+
}
243+
226244
/**
227245
* Advances processing time by a given duration and, if any timers fired, performs a non-seed
228246
* {@link DoFn.ProcessElement} call, feeding it the timers.
@@ -334,10 +352,14 @@ private static class WatermarkUpdateFn extends DoFn<Instant, String> {
334352
public void process(
335353
ProcessContext c,
336354
RestrictionTracker<OffsetRange, Long> tracker,
337-
ManualWatermarkEstimator<Instant> watermarkEstimator) {
355+
ManualWatermarkEstimator<Instant> watermarkEstimator,
356+
CausedByDrain causedByDrain) {
338357
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
339358
watermarkEstimator.setWatermark(c.element().plus(Duration.standardSeconds(i)));
340359
c.output(String.valueOf(i));
360+
if (causedByDrain == CausedByDrain.CAUSED_BY_DRAIN) {
361+
c.output("drains");
362+
}
341363
}
342364
}
343365

@@ -363,6 +385,28 @@ public WatermarkEstimators.Manual newWatermarkEstimator(
363385
}
364386
}
365387

388+
@Test
389+
public void testDrains() throws Exception {
390+
DoFn<Instant, String> fn = new WatermarkUpdateFn();
391+
Instant base = Instant.now();
392+
393+
ProcessFnTester<Instant, String, OffsetRange, Long, Instant> tester =
394+
new ProcessFnTester<>(
395+
base,
396+
fn,
397+
InstantCoder.of(),
398+
SerializableCoder.of(OffsetRange.class),
399+
InstantCoder.of(),
400+
3,
401+
MAX_BUNDLE_DURATION);
402+
403+
tester.startElement(base, new OffsetRange(0, 8));
404+
assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
405+
assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
406+
assertTrue(tester.advanceDrain());
407+
assertThat(tester.takeOutputElements(), hasItems("3", "4", "drains", "drains"));
408+
}
409+
366410
@Test
367411
public void testUpdatesWatermark() throws Exception {
368412
DoFn<Instant, String> fn = new WatermarkUpdateFn();

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ public BoundedWindow window() {
237237
return window;
238238
}
239239

240+
@Override
241+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
242+
return processContext.causedByDrain();
243+
}
244+
240245
@Override
241246
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
242247
return processContext.pane();
@@ -598,7 +603,7 @@ public Long currentRecordOffset() {
598603

599604
@Override
600605
public CausedByDrain causedByDrain() {
601-
return CausedByDrain.NORMAL;
606+
return element.getCausedByDrain();
602607
}
603608

604609
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public static <T> WindowedValue<T> of(
313313

314314
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
315315
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
316-
return valueInGlobalWindow(value, paneInfo);
316+
return new ValueInGlobalWindow<>(value, paneInfo, null, null, causedByDrain);
317317
} else if (isGlobal) {
318318
return new TimestampedValueInGlobalWindow<>(
319319
value, timestamp, paneInfo, null, null, causedByDrain);

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,11 +1804,6 @@ public <T> void outputWindowedValue(
18041804
outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo));
18051805
}
18061806

1807-
@Override
1808-
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1809-
return currentElement.causedByDrain();
1810-
}
1811-
18121807
@Override
18131808
public State state(String stateId, boolean alwaysFetched) {
18141809
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
@@ -1860,11 +1855,6 @@ public TimerMap timerFamily(String timerFamilyId) {
18601855
currentElement.getTimestamp(),
18611856
currentElement.getPaneInfo());
18621857
}
1863-
1864-
@Override
1865-
public CausedByDrain causedByDrain() {
1866-
return currentElement.causedByDrain();
1867-
}
18681858
}
18691859

18701860
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
@@ -2254,6 +2244,16 @@ public Object watermarkEstimatorState() {
22542244
public WatermarkEstimator<?> watermarkEstimator() {
22552245
return currentWatermarkEstimator;
22562246
}
2247+
2248+
@Override
2249+
public CausedByDrain causedByDrain() {
2250+
return currentElement.causedByDrain();
2251+
}
2252+
2253+
@Override
2254+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
2255+
return currentElement.causedByDrain();
2256+
}
22572257
}
22582258

22592259
/**

0 commit comments

Comments
 (0)