Skip to content

Commit 9baaf4d

Browse files
committed
Update aggregation examples
1 parent e7b6714 commit 9baaf4d

3 files changed

Lines changed: 41 additions & 214 deletions

File tree

examples/workflow_aggregator_llm_summary.py renamed to examples/workflow_aggregator_summary.py

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77
Aggregation technique: LLM synthesis (Agent as post-processor).
88
99
Run:
10-
uv run examples/workflow_aggregator_llm_summary.py
11-
uv run examples/workflow_aggregator_llm_summary.py --devui (opens DevUI at http://localhost:8101)
10+
uv run examples/workflow_aggregator_summary.py
11+
uv run examples/workflow_aggregator_summary.py --devui (opens DevUI at http://localhost:8101)
1212
"""
1313

1414
import asyncio
1515
import os
1616
import sys
1717

18+
from typing import Never
19+
1820
from agent_framework import Agent, AgentExecutorResponse, Executor, WorkflowBuilder, WorkflowContext, handler
1921
from agent_framework.openai import OpenAIChatClient
2022
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
@@ -53,20 +55,32 @@ async def dispatch(self, prompt: str, ctx: WorkflowContext[str]) -> None:
5355
await ctx.send_message(prompt)
5456

5557

56-
class FormatBranchResults(Executor):
57-
"""Fan-in collector that formats branch outputs into a single prompt."""
58+
class SummarizerExecutor(Executor):
59+
"""Fan-in aggregator that synthesizes expert outputs via a wrapped Agent."""
60+
61+
agent: Agent
62+
63+
def __init__(self, client: OpenAIChatClient, id: str = "Summarizer"):
64+
super().__init__(id=id)
65+
self.agent = Agent(
66+
client=client,
67+
name=id,
68+
instructions=(
69+
"You receive analysis from three domain experts (researcher, marketer, legal). "
70+
"Synthesize their combined insights into a concise 3-sentence executive brief "
71+
"that a CEO could read in 30 seconds. Do not repeat the raw analysis."
72+
),
73+
)
5874

5975
@handler
60-
async def format(
61-
self,
62-
results: list[AgentExecutorResponse],
63-
ctx: WorkflowContext[str],
64-
) -> None:
65-
"""Combine expert outputs into labeled sections for the summarizer."""
76+
async def run(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
77+
"""Format branch outputs and feed them to the summarizer Agent."""
6678
sections = []
6779
for result in results:
6880
sections.append(f"[{result.executor_id}]\n{result.agent_response.text}")
69-
await ctx.send_message("\n\n---\n\n".join(sections))
81+
combined = "\n\n---\n\n".join(sections)
82+
response = await self.agent.run(combined)
83+
await ctx.yield_output(response.text)
7084

7185

7286
dispatcher = DispatchPrompt(id="dispatcher")
@@ -101,19 +115,9 @@ async def format(
101115
),
102116
)
103117

104-
formatter = FormatBranchResults(id="formatter")
105-
106-
# The summarizer Agent is the final node — it receives the formatted expert
107-
# outputs and synthesizes them into a concise executive brief.
108-
summarizer = Agent(
109-
client=client,
110-
name="Summarizer",
111-
instructions=(
112-
"You receive analysis from three domain experts (researcher, marketer, legal). "
113-
"Synthesize their combined insights into a concise 3-sentence executive brief "
114-
"that a CEO could read in 30 seconds. Do not repeat the raw analysis."
115-
),
116-
)
118+
# The summarizer Executor wraps an Agent to handle fan-in directly —
119+
# it formats the collected expert outputs and synthesizes a brief.
120+
summarizer = SummarizerExecutor(client=client)
117121

118122
workflow = (
119123
WorkflowBuilder(
@@ -123,8 +127,7 @@ async def format(
123127
output_executors=[summarizer],
124128
)
125129
.add_fan_out_edges(dispatcher, [researcher, marketer, legal])
126-
.add_fan_in_edges([researcher, marketer, legal], formatter)
127-
.add_edge(formatter, summarizer)
130+
.add_fan_in_edges([researcher, marketer, legal], summarizer)
128131
.build()
129132
)
130133

examples/workflow_rag_ingest_parallel.py

Lines changed: 0 additions & 176 deletions
This file was deleted.

plan.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ This plan is organized by proposed example file. For each file: (1) source examp
88
3. `workflow_aggregator_structured.py` (Pydantic structured extraction)
99
4. `workflow_aggregator_voting.py` (ensemble classification — majority vote)
1010
5. `workflow_aggregator_ranked.py` (generate N candidates — score & rank)
11-
6. `workflow_rag_ingest_parallel.py` (parallelized version of `workflow_rag_ingest.py`)
1211
7. `workflow_multi_selection_edge_group.py`
1312
8. `workflow_agents_concurrent.py`
1413
9. `workflow_concurrent_custom_aggregator.py`
@@ -63,16 +62,6 @@ This plan is organized by proposed example file. For each file: (1) source examp
6362
- Aggregator (Agent or Executor) scores each proposal on criteria (creativity, memorability, brand fit) and yields a ranked list.
6463
- Teaching point: aggregation can be evaluative — the fan-in step judges via LLM, not just collects. Pattern applies to "generate N candidates, pick the best."
6564

66-
### 6) `/workspace/examples/workflow_rag_ingest_parallel.py`
67-
- **Pull from**
68-
- Local: `/workspace/examples/workflow_rag_ingest.py` (extract/chunk/embed pipeline and provider bootstrap)
69-
- Upstream: `python/samples/03-workflows/parallelism/fan_out_fan_in_edges.py`, `python/samples/03-workflows/parallelism/map_reduce_and_visualization.py`
70-
- Docs: Edges and parallelism concepts (`add_fan_out_edges`, `add_fan_in_edges`)
71-
- **Workflow accomplishes**
72-
- Parallelizes embedding generation by fan-out over chunk batches and fan-in aggregation of embedded chunks.
73-
- Keeps the same business goal as the current ingest demo (RAG ingestion) while changing the execution model.
74-
- Provides a direct comparison against `workflow_agents_concurrent.py` to show domain-level parallelism (specialist perspectives) vs data-level parallelism (batch processing).
75-
7665
### 7) `/workspace/examples/workflow_multi_selection_edge_group.py`
7766
- **Pull from**
7867
- Local: `/workspace/examples/workflow_switch_case.py` (edge-group style, structured routing)
@@ -141,6 +130,17 @@ This plan is organized by proposed example file. For each file: (1) source examp
141130
- **Purpose**: contrasts language-level concurrency (`asyncio.gather`) with framework-level orchestration.
142131
- **Why appendix**: can distract from the core built-in workflow builder narrative for audiences new to async Python.
143132

133+
### `/workspace/examples/workflow_rag_ingest_parallel.py`
134+
- **Pull from**
135+
- Local: `/workspace/examples/workflow_rag_ingest.py` (extract/chunk/embed pipeline and provider bootstrap)
136+
- Upstream: `python/samples/03-workflows/parallelism/fan_out_fan_in_edges.py`, `python/samples/03-workflows/parallelism/map_reduce_and_visualization.py`
137+
- Docs: Edges and parallelism concepts (`add_fan_out_edges`, `add_fan_in_edges`)
138+
- **Workflow accomplishes**
139+
- Parallelizes embedding generation by fan-out over chunk batches and fan-in aggregation of embedded chunks.
140+
- Keeps the same business goal as the current ingest demo (RAG ingestion) while changing the execution model.
141+
- Provides a direct comparison against `workflow_agents_concurrent.py` to show domain-level parallelism (specialist perspectives) vs data-level parallelism (batch processing).
142+
143+
144144
## Scope Boundaries
145145
- Include only built-in workflow builder content for Session 5.
146146
- Exclude HITL-focused demos (next session).
@@ -150,7 +150,7 @@ This plan is organized by proposed example file. For each file: (1) source examp
150150
## Verification Plan
151151
1. Smoke run each new script with standard provider env pattern used by current examples.
152152
2. Verify each script’s terminal output demonstrates the intended teaching point above.
153-
3. Run an explicit A/B comparison between `workflow_agents_concurrent.py` and `workflow_rag_ingest_parallel.py` and capture whether the audience sees a clear difference in orchestration pattern (specialist-agent parallelism vs data-pipeline parallelism).
154153
4. Rehearse in talk order with one fallback per segment (if a demo fails, which demo replaces it).
154+
155155
## Open Questions for MAF Team
156156
- **When is `output_executors` needed/recommended?** By default, `WorkflowBuilder` surfaces all outputs from all executors as events. If you only want outputs from certain executors, use `output_executors`. In our fan-out/fan-in demos, without `output_executors=[<aggregator>]`, the intermediate `Agent` nodes' `AgentResponse` objects leak into `get_outputs()` alongside the aggregator's output. We added `output_executors` to every fan-out/fan-in demo. Worth a slide callout — possibly on the same slide as the aggregation patterns table.

0 commit comments

Comments
 (0)