[BREAKING] Python: Update orchestration return types#4733
[BREAKING] Python: Update orchestration return types#4733TaoChenOSU wants to merge 2 commits intomicrosoft:mainfrom
Conversation
moonbox3
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 89%
✗ Correctness
The diff has two correctness bugs. First,
_CallbackAggregator.aggregateno longer wraps sync callbacks withasyncio.to_thread, meaning sync callbacks will block the event loop — contradicting the docstring that still promises non-blocking execution. Theimport asynciowas even removed. Second,OrchestrationOutputis not exported from the package's__init__.pyor__all__, forcing all consumers (including the updated tests) to import from the private_orchestration_sharedmodule. This makes the primary output type of every orchestration inaccessible via the public API. Additionally, the Magentic_prepare_final_answersemantically changed from yielding only the final answer to yielding the entire conversation history — a behavioral change beyond the stated wrapping goal.
✗ Security Reliability
The _CallbackAggregator refactor introduces a reliability regression: synchronous callbacks previously ran via asyncio.to_thread to avoid blocking the event loop, but now execute directly on the event loop thread. If a user-provided sync callback performs I/O or CPU-heavy work, this will block all concurrent async tasks. Additionally, the callback return value is unconditionally passed to list() without a None guard, so a callback that returns None (e.g., a bare function with no return) will crash with TypeError at runtime despite the type hint. The Magentic _prepare_final_answer behavioral change—from yielding only the final answer to yielding the entire chat history—could unintentionally expose sensitive intermediate conversation data to downstream consumers.
✗ Test Coverage
Test coverage is broadly updated for the OrchestrationOutput wrapping, but has notable gaps: (1) The Magentic behavioral change in
_prepare_final_answer— now yielding full chat history instead of just the final answer — has no test verifying the new output content structure; existing tests only assert on types. (2) The_CallbackAggregatordroppedasyncio.to_threadfor sync callbacks, meaning sync callbacks now block the event loop, but the sync callback test doesn't detect this regression and has a stale comment claiming it still usesasyncio.to_thread. (3)OrchestrationOutputis not exported in__init__.py, so all tests import from the private_orchestration_sharedpath — there's no test proving public importability. (4) The_CallbackAggregatornow unconditionally callslist(ret)which will raiseTypeErrorif a callback returnsNone, but there is no test covering this edge case.
✗ Design Approach
The PR introduces OrchestrationOutput as a wrapper for the final list[Message] from all orchestrations to enable isinstance-based type discrimination in the event stream. The core problem being solved is real and the consolidation of shared helpers is reasonable housekeeping. However, three design issues stand out: (1) OrchestrationOutput is the primary output type SDK consumers will interact with, yet it is never exported from init.py — every test imports it from the private _orchestration_shared module, forcing users to do the same; (2) the _CallbackAggregator API has been silently narrowed from accepting any return type or a ctx parameter to requiring list[Message], removing a legitimate power-user escape hatch without a stated rationale; and (3) Magentic's _prepare_final_answer now returns the full chat history instead of just the final answer message — a silent behavioral change buried in a 'type wrapping' PR that could break consumers who expected a single-message result.
Flagged Issues
- _CallbackAggregator.aggregate no longer uses asyncio.to_thread for synchronous callbacks (the asyncio import was removed entirely), so any non-trivial sync callback will block the event loop. The docstring still claims sync callbacks run via asyncio.to_thread. This is a reliability regression that can cause latency spikes or deadlocks in production.
- OrchestrationOutput is not imported or listed in all in agent_framework_orchestrations/init.py. Every consumer (including all tests) must import from the private _orchestration_shared module. Since this is the type users must isinstance-check against to consume workflow results, it must be part of the public API before the PR ships.
- _CallbackAggregator unconditionally calls list(ret) on the callback return value. If a callback returns None (e.g., a function with no return statement), this raises TypeError at runtime. The old code had an explicit
if ret is not Noneguard. - Magentic _prepare_final_answer changed from yielding [final_answer] (one message) to yielding OrchestrationOutput(messages=[*chat_history, final_answer]) (the entire conversation). This is an undisclosed semantic breaking change that could expose sensitive intermediate data (tool calls, agent reasoning) and break consumers expecting a single-message result. Existing tests only assert isinstance—they never verify that output.messages contains the expected content. If intentional, it needs explicit documentation and test coverage; if incidental, revert to wrapping only [final_answer].
Suggestions
- end_with_agent_executor_response silently converts a None full_conversation into an empty OrchestrationOutput via
or []. Consider raising an error or logging a warning if full_conversation is None, since an empty output is likely a bug. - Add a test for Magentic verifying that output.messages contains conversation history entries beyond the final answer (e.g., assert len(output.messages) > 1 and that earlier agent turns are included) to document the new behavior.
- The sync callback test comment at line 121 of test_concurrent.py says 'should run via asyncio.to_thread' but the implementation no longer does this. Update the comment and add a test that verifies sync callbacks don't block the event loop.
- Add a test for _CallbackAggregator when a callback returns None to document the new behavior (TypeError) vs the old behavior (no-op), or add a runtime guard with a clear error message.
- The _CallbackAggregator API was silently narrowed from accepting callbacks with an optional ctx parameter (to yield arbitrary output types) to requiring list[Message] → OrchestrationOutput, removing a documented power-user extension point. If intentional, call this out as a separate API break with a rationale; if unintentional, preserve the ctx-parameter path.
- Consider renaming OrchestrationOutput to something more distinct from AgentOrchestrationOutput (e.g., OrchestrationResult or WorkflowResult). The two names are visually similar but represent completely different things—one is the manager agent's routing JSON, the other is the final conversation payload.
- clean_conversation_for_handoff was previously public API (exported from init.py and all). Renaming it to filter_tool_contents and removing both names from the public API is a silent breaking change. If intentional, document in a migration guide; if not, keep the old name as a deprecated alias.
Automated review by moonbox3's agents
|
|
||
| Accepts either an async or sync callback with one of the signatures: | ||
| - (results: list[AgentExecutorResponse]) -> Any | None | ||
| - (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None | ||
| Accepts either an async or sync callback with the following signature: | ||
| - (results: list[AgentExecutorResponse]) -> list[Message] | ||
|
|
There was a problem hiding this comment.
Bug: sync callbacks are now called directly on the event loop thread instead of via asyncio.to_thread, blocking all concurrent async tasks. The asyncio import was removed entirely. Restore the asyncio.to_thread path for non-awaitable callbacks.
| Accepts either an async or sync callback with one of the signatures: | |
| - (results: list[AgentExecutorResponse]) -> Any | None | |
| - (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None | |
| Accepts either an async or sync callback with the following signature: | |
| - (results: list[AgentExecutorResponse]) -> list[Message] | |
| if inspect.iscoroutinefunction(self._callback): | |
| ret = await self._callback(results) | |
| else: | |
| import asyncio | |
| ret = await asyncio.to_thread(self._callback, results) | |
| await ctx.yield_output(OrchestrationOutput(messages=list(ret))) |
| output.extend(assistant_replies) | ||
|
|
||
| await ctx.yield_output(output) | ||
| await ctx.yield_output(OrchestrationOutput(messages=output)) |
There was a problem hiding this comment.
Stale docstring: still says 'Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop' but the implementation no longer does this. Update the docstring to match the actual behavior, or (better) restore the asyncio.to_thread wrapping.
| - (results: list[AgentExecutorResponse]) -> list[Message] | ||
|
|
||
| The returned list[Message] is automatically wrapped in an OrchestrationOutput. | ||
|
|
There was a problem hiding this comment.
If the callback returns None at runtime (e.g., a function with no return statement), list(ret) raises TypeError. The old code had an explicit None guard. Add validation to produce a clear error message.
| if ret is None: | |
| raise TypeError("Aggregator callback must return list[Message], got None") | |
| await ctx.yield_output(OrchestrationOutput(messages=list(ret))) |
| from ._orchestration_request_info import AgentRequestInfoResponse | ||
| from ._orchestration_state import OrchestrationState | ||
| from ._orchestrator_helpers import clean_conversation_for_handoff, create_completion_message | ||
| from ._orchestration_shared import OrchestrationState |
There was a problem hiding this comment.
OrchestrationOutput is the primary type SDK consumers need for isinstance checks and type hints on workflow results, but it is never imported here. Add it to the imports.
| from ._orchestration_shared import OrchestrationState | |
| from ._orchestration_shared import OrchestrationOutput, OrchestrationState |
| @@ -105,6 +104,4 @@ | |||
| "StandardMagenticManager", | |||
There was a problem hiding this comment.
OrchestrationOutput is not included in all. Add it so users can import it from the public package path rather than the private _orchestration_shared module.
| "StandardMagenticManager", | |
| "OrchestrationOutput", | |
| "StandardMagenticManager", | |
| "TerminationCondition", | |
| "__version__", |
|
|
||
| # Emit a completed event for the workflow | ||
| await ctx.yield_output([final_answer]) | ||
| await ctx.yield_output(OrchestrationOutput(messages=[*self._magentic_context.chat_history, final_answer])) |
There was a problem hiding this comment.
Silent behavioral change: previously yielded [final_answer] (single message), now yields the entire conversation history plus the final answer. This changes what downstream consumers receive and may expose sensitive intermediate data. Wrapping in OrchestrationOutput should not change what is wrapped. If aligning Magentic with other orchestrations that return full history is intentional, it should be a separate explicit decision with documentation.
| await ctx.yield_output(OrchestrationOutput(messages=[*self._magentic_context.chat_history, final_answer])) | |
| await ctx.yield_output(OrchestrationOutput(messages=[final_answer])) |
| assert output is not None | ||
| assert isinstance(output, list) | ||
| assert all(isinstance(msg, Message) for msg in output) | ||
| assert isinstance(output, OrchestrationOutput) |
There was a problem hiding this comment.
These assertions only verify the output type, not the behavioral change. _prepare_final_answer now yields the full chat history plus the final answer (previously just [final_answer]). Add an assertion that output.messages contains the expected number of messages and includes conversation history, e.g. assert len(output.messages) > 1.
| # Sync callback with ctx parameter (should run via asyncio.to_thread) | ||
| def summarize_sync(results: list[AgentExecutorResponse], _ctx: WorkflowContext[Any]) -> str: # type: ignore[unused-argument] | ||
| # Sync callback (should run via asyncio.to_thread) | ||
| def summarize_sync(results: list[AgentExecutorResponse]) -> list[Message]: |
There was a problem hiding this comment.
Stale comment: the implementation no longer uses asyncio.to_thread for sync callbacks—they are called directly on the event loop. This test doesn't detect the behavioral regression. Update the comment and consider adding a test that verifies sync callbacks don't block the loop.
| def summarize_sync(results: list[AgentExecutorResponse]) -> list[Message]: | |
| # Sync callback (called directly; no longer wrapped in asyncio.to_thread) |
| from agent_framework._types import Message | ||
|
|
||
|
|
||
| @dataclass | ||
| class OrchestrationOutput: | ||
| """Standardized output format for orchestrations. | ||
|
|
||
| Attributes: | ||
| messages: List of messages representing the full conversation of the orchestration, including all agent turns. | ||
| """ | ||
|
|
There was a problem hiding this comment.
OrchestrationOutput is defined here but never added to the package's init.py or all. Every test imports it from this private module. As the canonical output type for all orchestrations, it should be publicly exported.
moonbox3
left a comment
There was a problem hiding this comment.
A larger question: what future fields justify OrchestrationOutput as a single-field wrapper over list[Message]? If we're planning to add metadata, termination_reason, duration, etc., the wrapper earns its keep. If it's purely for
isinstance discrimination in the event stream, it adds .messages ceremony everywhere. Is there a lighter way to solve that (for example, a distinct event type for workflow completion)?
I also agree. Creating a new distinct event type for workflow completion is a good idea. However, that event type will no longer be of type output. I think that's probably ok. A potential issue is that when an orchestration is wrapped as a sub workflow, it will only produce incremental updates that downstream executors have to put them back together. Let's discuss this option today. |
Motivation and Context
We currently don't have a way to distinguish agent outputs and workflow outputs in an orchestration. This is especially confusing when an orchestration is used as an agent, because there is no way to distinguish if a response is from an agent or it is the output of the workflow.
Take sequential for example, each agent will produce an
AgentResponsethat will get surfaced to the caller. However, the workflow will additionally output a list of messages containing the full conversation when it completes. This list of messages will get converted to anAgentResponseand get surfaced to the caller. This results in the following that will be very misleading:Description
Contribution Checklist