Skip to content

Commit c9ab4b6

Browse files
Add default_pickle_library_override delegation to InteractiveRunner (#37752)
* Add default_pickle_library_override delegation to InteractiveRunner * pylint
1 parent 3197d88 commit c9ab4b6

2 files changed

Lines changed: 26 additions & 0 deletions

File tree

sdks/python/apache_beam/runners/interactive/interactive_runner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ def is_fnapi_compatible(self):
9393
# return self._underlying_runner.is_fnapi_compatible()
9494
return False
9595

96+
def default_pickle_library_override(self):
97+
"""Delegates pickler override to the underlying runner."""
98+
if hasattr(self._underlying_runner, 'default_pickle_library_override'):
99+
return self._underlying_runner.default_pickle_library_override()
100+
return super().default_pickle_library_override()
101+
96102
def set_render_option(self, render_option):
97103
"""Sets the rendering option.
98104

sdks/python/apache_beam/runners/interactive/interactive_runner_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
4444
from apache_beam.runners.interactive.testing.mock_env import isolated_env
4545
from apache_beam.runners.portability.flink_runner import FlinkRunner
46+
from apache_beam.runners.runner import PipelineRunner
4647
from apache_beam.testing.test_stream import TestStream
4748
from apache_beam.transforms.window import GlobalWindow
4849
from apache_beam.transforms.window import IntervalWindow
@@ -532,6 +533,25 @@ def test_defaults_to_efficient_cache(self):
532533
# Despite (highly redundant) windowing information, the cache is small.
533534
self.assertLess(size, sum(inputs))
534535

536+
def test_default_pickle_library_override_delegates(self):
537+
mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner)
538+
mock_underlying.default_pickle_library_override.return_value = 'cloudpickle'
539+
540+
runner = interactive_runner.InteractiveRunner(
541+
underlying_runner=mock_underlying)
542+
543+
self.assertEqual(runner.default_pickle_library_override(), 'cloudpickle')
544+
545+
def test_default_pickle_library_override_fallback(self):
546+
mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner)
547+
del mock_underlying.default_pickle_library_override
548+
549+
runner = interactive_runner.InteractiveRunner(
550+
underlying_runner=mock_underlying)
551+
552+
# Should fallback to the base class implementation without crashing
553+
self.assertIsNone(runner.default_pickle_library_override())
554+
535555

536556
@unittest.skipIf(
537557
not ie.current_env().is_interactive_ready,

0 commit comments

Comments
 (0)