Skip to content

Commit 1b523ef

Browse files
claudevdmClaude
andauthored
Allow sliding windows + combine with hot key fanout (#37899)
* init * Dont assign windows in sliding windows case * comments --------- Co-authored-by: Claude <cvandermerwe@google.com>
1 parent 6f1f242 commit 1b523ef

2 files changed

Lines changed: 81 additions & 30 deletions

File tree

sdks/python/apache_beam/transforms/combiners_test.py

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -584,26 +584,63 @@ def has_expected_values(actual):
584584

585585
assert_that(result, has_expected_values)
586586

587-
def test_combining_with_sliding_windows_and_fanout_raises_error(self):
587+
def test_combining_with_sliding_windows_and_fanout(self):
588588
options = PipelineOptions()
589589
options.view_as(StandardOptions).streaming = True
590-
with self.assertRaises(ValueError):
591-
with TestPipeline(options=options) as p:
592-
_ = (
593-
p
594-
| beam.Create([
595-
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
596-
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
597-
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
598-
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
599-
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
600-
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
601-
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
602-
window.TimestampedValue(8, Timestamp(seconds=1666707518))
603-
])
604-
| beam.WindowInto(window.SlidingWindows(10, 5))
605-
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
606-
without_defaults().with_fanout(7))
590+
with TestPipeline(options=options) as p:
591+
592+
def has_expected_values(actual):
593+
from hamcrest.core import assert_that as hamcrest_assert
594+
from hamcrest.library.collection import only_contains
595+
ordered = sorted(actual)
596+
hamcrest_assert(
597+
ordered,
598+
only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8]))
599+
600+
result = (
601+
p
602+
| beam.Create([
603+
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
604+
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
605+
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
606+
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
607+
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
608+
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
609+
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
610+
window.TimestampedValue(8, Timestamp(seconds=1666707518))
611+
])
612+
| beam.WindowInto(window.SlidingWindows(10, 5))
613+
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
614+
without_defaults().with_fanout(7))
615+
assert_that(result, has_expected_values)
616+
617+
def test_combining_with_session_windows_and_fanout(self):
618+
options = PipelineOptions()
619+
options.view_as(StandardOptions).streaming = True
620+
with TestPipeline(options=options) as p:
621+
622+
def has_expected_values(actual):
623+
from hamcrest.core import assert_that as hamcrest_assert
624+
from hamcrest.library.collection import only_contains
625+
ordered = sorted(actual)
626+
hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8]))
627+
628+
result = (
629+
p
630+
| beam.Create([
631+
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
632+
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
633+
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
634+
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
635+
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
636+
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
637+
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
638+
window.TimestampedValue(8, Timestamp(seconds=1666707518))
639+
])
640+
| beam.WindowInto(window.Sessions(2))
641+
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
642+
without_defaults().with_fanout(7))
643+
assert_that(result, has_expected_values)
607644

608645
def test_MeanCombineFn_combine(self):
609646
with TestPipeline() as p:

sdks/python/apache_beam/transforms/core.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3271,10 +3271,7 @@ def expand(self, pcoll):
32713271
combine_fn = self._combine_fn
32723272
fanout_fn = self._fanout_fn
32733273

3274-
if isinstance(pcoll.windowing.windowfn, SlidingWindows):
3275-
raise ValueError(
3276-
'CombinePerKey.with_hot_key_fanout does not yet work properly with '
3277-
'SlidingWindows. See: https://github.com/apache/beam/issues/20528')
3274+
use_direct_windowing = isinstance(pcoll.windowing.windowfn, SlidingWindows)
32783275

32793276
class SplitHotCold(DoFn):
32803277
def start_bundle(self):
@@ -3344,14 +3341,31 @@ def StripNonce(nonce_key_value):
33443341

33453342
cold, hot = pcoll | ParDo(SplitHotCold()).with_outputs('hot', main='cold')
33463343
cold.element_type = typehints.Any # No multi-output type hints.
3347-
precombined_hot = (
3348-
hot
3349-
# Avoid double counting that may happen with stacked accumulating mode.
3350-
| 'WindowIntoDiscarding' >> WindowInto(
3351-
pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
3352-
| CombinePerKey(PreCombineFn())
3353-
| Map(StripNonce)
3354-
| 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))
3344+
3345+
if use_direct_windowing:
3346+
# For SlidingWindows, swap windowing strategy metadata directly on the
3347+
# PCollection without re-assigning windows. This mirrors Java's
3348+
# setWindowingStrategyInternal(). Using WindowInto would call
3349+
# windowfn.assign() which re-evaluates window assignments from
3350+
# timestamps — with SlidingWindows, this causes accumulators to leak
3351+
# into adjacent overlapping windows.
3352+
if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING:
3353+
discarding_windowing = copy.copy(pcoll.windowing)
3354+
discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING
3355+
hot._windowing = discarding_windowing
3356+
precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce))
3357+
precombined_hot._windowing = pcoll.windowing
3358+
else:
3359+
precombined_hot = (
3360+
hot
3361+
# Avoid double counting that may happen with stacked accumulating
3362+
# mode.
3363+
| 'WindowIntoDiscarding' >> WindowInto(
3364+
pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
3365+
| CombinePerKey(PreCombineFn())
3366+
| Map(StripNonce)
3367+
| 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))
3368+
33553369
return ((cold, precombined_hot)
33563370
| Flatten()
33573371
| CombinePerKey(PostCombineFn()))

0 commit comments

Comments
 (0)