Skip to content

Commit 7620a93

Browse files
authored
Fix #37736: Allow composite transforms to use implicit input chaining (#37861)
* Fix #37736: Allow composite transforms to use implicit input chaining When a composite transform has no explicit inputs/outputs on its sub-transforms, automatically chain them similar to how 'chain' type transforms work. Added test_composite_implicit_input_chaining to verify the fix. * Fix line-too-long lint error in yaml_transform.py * Fix yapf formatting in yaml_transform.py * Fix composite implicit input chaining logic * Fix composite implicit input chaining - delete empty input from spec * Fix composite implicit input - reference pipeline input directly * Fix composite implicit input - reference pipeline input * Revert "Fix composite implicit input - reference pipeline input" This reverts commit ceb0ac1. * Fix composite implicit input chaining from pipeline input This fix addresses the issue where composite transforms with no explicit input specification were failing to receive inputs from the pipeline. Key changes: 1. Fixed has_explicit_io check to use is_empty() instead of just checking key presence - this properly treats {} as 'no explicit input' 2. Added composite_has_input check to only do implicit chaining when the composite has an input to chain from 3. Fixed inner_scope_inputs computation to use parent scope's inputs when the composite has no explicit input 4. Fixed output handling to use is_empty() check (normalization sets {}) 5. Fixed final return to correctly resolve scope inputs vs transform outputs * Apply yapf formatting and fix pylint line length issues * Address review: revert unnecessary variable assignment
1 parent 0afbdf6 commit 7620a93

2 files changed

Lines changed: 99 additions & 9 deletions

File tree

sdks/python/apache_beam/yaml/yaml_transform.py

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -796,12 +796,72 @@ def to_row(element):
796796
def expand_composite_transform(spec, scope):
797797
spec = normalize_inputs_outputs(normalize_source_sink(spec))
798798

799+
original_transforms = spec['transforms']
800+
# Check if any transform has a NON-EMPTY explicit input or output.
801+
# Note: {} (empty dict) means "no explicit input specified" and should
802+
# NOT count as having explicit io.
803+
# However, if the composite has no input, we can't do implicit chaining.
804+
has_explicit_io = any(
805+
io is not None and not is_empty(t.get(io, {}))
806+
for t in original_transforms for io in ('input', 'output'))
807+
808+
# If the composite has no input, we can't do implicit chaining
809+
composite_has_input = not is_empty(spec.get('input', {}))
810+
811+
# Only do implicit chaining if:
812+
# 1. No transform has explicit io, AND
813+
# 2. The composite has an input to chain from
814+
if not has_explicit_io and composite_has_input:
815+
new_transforms = []
816+
for ix, transform in enumerate(original_transforms):
817+
transform = dict(transform)
818+
if ix == 0:
819+
composite_input = spec.get('input', {})
820+
if is_explicitly_empty(composite_input):
821+
transform['input'] = composite_input
822+
elif is_empty(composite_input):
823+
# No explicit input - the composite input IS the pipeline input.
824+
# Reference the 'input' key from the Scope's inputs.
825+
transform['input'] = 'input'
826+
else:
827+
transform['input'] = {key: key for key in composite_input.keys()}
828+
else:
829+
transform['input'] = new_transforms[-1]['__uuid__']
830+
new_transforms.append(transform)
831+
832+
if new_transforms:
833+
spec = dict(spec, transforms=new_transforms)
834+
# Check if output is empty, not just present (normalization sets it to {})
835+
if is_empty(spec.get('output', {})):
836+
spec['output'] = {
837+
'__implicit_outputs__': new_transforms[-1]['__uuid__']
838+
}
839+
840+
# Compute the inputs for the inner scope.
841+
# If the composite has an empty input dict ({}), it means the composite
842+
# should use the parent scope's inputs directly.
843+
composite_input = spec.get('input', {})
844+
845+
if is_empty(composite_input):
846+
# No explicit input - use the parent scope's inputs directly
847+
inner_scope_inputs = dict(scope._inputs)
848+
else:
849+
# The composite has explicit input references
850+
# They can reference either:
851+
# 1. A parent scope input (e.g., 'input' key in scope._inputs)
852+
# 2. A transform output (e.g., 'uuid' -> the output of a transform)
853+
inner_scope_inputs = {}
854+
for key, value in composite_input.items():
855+
if isinstance(value, str) and value in scope._inputs:
856+
# Reference to a parent scope input
857+
inner_scope_inputs[key] = scope._inputs[value]
858+
else:
859+
# Reference to a transform output
860+
inner_scope_inputs[key] = scope.get_pcollection(value)
861+
799862
inner_scope = Scope(
800863
scope.root,
801-
{
802-
key: scope.get_pcollection(value)
803-
for (key, value) in empty_if_explicitly_empty(spec['input']).items()
804-
},
864+
inner_scope_inputs,
805865
spec['transforms'],
806866
# TODO(robertwb): Are scoped providers ever used? Worth supporting?
807867
yaml_provider.merge_providers(
@@ -814,7 +874,8 @@ class CompositePTransform(beam.PTransform):
814874
def expand(inputs):
815875
inner_scope.compute_all()
816876
if '__implicit_outputs__' in spec['output']:
817-
return inner_scope.get_outputs(spec['output']['__implicit_outputs__'])
877+
result = inner_scope.get_outputs(spec['output']['__implicit_outputs__'])
878+
return result
818879
else:
819880
return {
820881
key: inner_scope.get_pcollection(value)
@@ -826,16 +887,25 @@ def expand(inputs):
826887
transform = transform.with_resource_hints(
827888
**SafeLineLoader.strip_metadata(spec['resource_hints']))
828889

890+
# Always set a name for the composite to ensure proper return value
829891
if 'name' not in spec:
830892
spec['name'] = 'Composite'
831893
if spec['name'] is None: # top-level pipeline, don't nest
832894
return transform.expand(None)
833895
else:
834896
_LOGGER.info("Expanding %s ", identify_object(spec))
835-
return ({
836-
key: scope.get_pcollection(value)
837-
for (key, value) in empty_if_explicitly_empty(spec['input']).items()
838-
} or scope.root) | scope.unique_name(spec, None) >> transform
897+
# When the input references a scope input (not a transform output),
898+
# we need to use the scope's inputs directly
899+
input_dict = {}
900+
for key, value in empty_if_explicitly_empty(spec['input']).items():
901+
if isinstance(value, str) and value in scope._inputs:
902+
# Reference to a scope input
903+
input_dict[key] = scope._inputs[value]
904+
else:
905+
# Reference to a transform output
906+
input_dict[key] = scope.get_pcollection(value)
907+
return (input_dict or
908+
scope.root) | scope.unique_name(spec, None) >> transform
839909

840910

841911
def expand_chain_transform(spec, scope):

sdks/python/apache_beam/yaml/yaml_transform_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,26 @@ def test_composite(self):
122122
providers=TEST_PROVIDERS)
123123
assert_that(result, equal_to([1, 4, 9, 1, 8, 27]))
124124

125+
def test_composite_implicit_input_chaining(self):
126+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
127+
pickle_library='cloudpickle')) as p:
128+
elements = p | beam.Create([1, 2, 3])
129+
result = elements | YamlTransform(
130+
'''
131+
type: composite
132+
transforms:
133+
- type: PyMap
134+
name: Square
135+
config:
136+
fn: "lambda x: x * x"
137+
- type: PyMap
138+
name: Increment
139+
config:
140+
fn: "lambda x: x + 1"
141+
''',
142+
providers=TEST_PROVIDERS)
143+
assert_that(result, equal_to([2, 5, 10]))
144+
125145
def test_chain_with_input(self):
126146
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
127147
pickle_library='cloudpickle')) as p:

0 commit comments

Comments
 (0)