Skip to content

[BREAKING] Python: Update orchestration return types#4733

Draft
TaoChenOSU wants to merge 2 commits intomicrosoft:mainfrom
TaoChenOSU:taochen/python-update-orchestration-return-types
Draft

[BREAKING] Python: Update orchestration return types#4733
TaoChenOSU wants to merge 2 commits intomicrosoft:mainfrom
TaoChenOSU:taochen/python-update-orchestration-return-types

Conversation

@TaoChenOSU
Copy link
Contributor

@TaoChenOSU TaoChenOSU commented Mar 16, 2026

Motivation and Context

We currently don't have a way to distinguish agent outputs and workflow outputs in an orchestration. This is especially confusing when an orchestration is used as an agent, because there is no way to distinguish if a response is from an agent or it is the output of the workflow.

Take sequential for example, each agent will produce an AgentResponse that will get surfaced to the caller. However, the workflow will additionally output a list of messages containing the full conversation when it completes. This list of messages will get converted to an AgentResponse and get surfaced to the caller. This results in the following that will be very misleading:

Agent A: ...  <- Agent output
Agent B: ...  <- Agent output
Agent A: ...  <- Workflow output that gets converted to `AgentResponse`
Agent B: ...  <- Workflow output that gets converted to `AgentResponse`

Description

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

@TaoChenOSU TaoChenOSU self-assigned this Mar 16, 2026
@TaoChenOSU TaoChenOSU added python workflows Related to Workflows in agent-framework labels Mar 16, 2026
@github-actions github-actions bot changed the title Update orchestration return types Python: Update orchestration return types Mar 16, 2026
@TaoChenOSU TaoChenOSU changed the title Python: Update orchestration return types [BREAKING] Python: Update orchestration return types Mar 16, 2026
Copy link
Contributor

@moonbox3 moonbox3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 4 | Confidence: 89%

✗ Correctness

The diff has two correctness bugs. First, _CallbackAggregator.aggregate no longer wraps sync callbacks with asyncio.to_thread, meaning sync callbacks will block the event loop — contradicting the docstring that still promises non-blocking execution. The import asyncio was even removed. Second, OrchestrationOutput is not exported from the package's __init__.py or __all__, forcing all consumers (including the updated tests) to import from the private _orchestration_shared module. This makes the primary output type of every orchestration inaccessible via the public API. Additionally, the Magentic _prepare_final_answer semantically changed from yielding only the final answer to yielding the entire conversation history — a behavioral change beyond the stated wrapping goal.

✗ Security Reliability

The _CallbackAggregator refactor introduces a reliability regression: synchronous callbacks previously ran via asyncio.to_thread to avoid blocking the event loop, but now execute directly on the event loop thread. If a user-provided sync callback performs I/O or CPU-heavy work, this will block all concurrent async tasks. Additionally, the callback return value is unconditionally passed to list() without a None guard, so a callback that returns None (e.g., a bare function with no return) will crash with TypeError at runtime despite the type hint. The Magentic _prepare_final_answer behavioral change—from yielding only the final answer to yielding the entire chat history—could unintentionally expose sensitive intermediate conversation data to downstream consumers.

✗ Test Coverage

Test coverage is broadly updated for the OrchestrationOutput wrapping, but has notable gaps: (1) The Magentic behavioral change in _prepare_final_answer — now yielding full chat history instead of just the final answer — has no test verifying the new output content structure; existing tests only assert on types. (2) The _CallbackAggregator dropped asyncio.to_thread for sync callbacks, meaning sync callbacks now block the event loop, but the sync callback test doesn't detect this regression and has a stale comment claiming it still uses asyncio.to_thread. (3) OrchestrationOutput is not exported in __init__.py, so all tests import from the private _orchestration_shared path — there's no test proving public importability. (4) The _CallbackAggregator now unconditionally calls list(ret) which will raise TypeError if a callback returns None, but there is no test covering this edge case.

✗ Design Approach

The PR introduces OrchestrationOutput as a wrapper for the final list[Message] from all orchestrations to enable isinstance-based type discrimination in the event stream. The core problem being solved is real and the consolidation of shared helpers is reasonable housekeeping. However, three design issues stand out: (1) OrchestrationOutput is the primary output type SDK consumers will interact with, yet it is never exported from init.py — every test imports it from the private _orchestration_shared module, forcing users to do the same; (2) the _CallbackAggregator API has been silently narrowed from accepting any return type or a ctx parameter to requiring list[Message], removing a legitimate power-user escape hatch without a stated rationale; and (3) Magentic's _prepare_final_answer now returns the full chat history instead of just the final answer message — a silent behavioral change buried in a 'type wrapping' PR that could break consumers who expected a single-message result.

Flagged Issues

  • _CallbackAggregator.aggregate no longer uses asyncio.to_thread for synchronous callbacks (the asyncio import was removed entirely), so any non-trivial sync callback will block the event loop. The docstring still claims sync callbacks run via asyncio.to_thread. This is a reliability regression that can cause latency spikes or deadlocks in production.
  • OrchestrationOutput is not imported or listed in all in agent_framework_orchestrations/init.py. Every consumer (including all tests) must import from the private _orchestration_shared module. Since this is the type users must isinstance-check against to consume workflow results, it must be part of the public API before the PR ships.
  • _CallbackAggregator unconditionally calls list(ret) on the callback return value. If a callback returns None (e.g., a function with no return statement), this raises TypeError at runtime. The old code had an explicit if ret is not None guard.
  • Magentic _prepare_final_answer changed from yielding [final_answer] (one message) to yielding OrchestrationOutput(messages=[*chat_history, final_answer]) (the entire conversation). This is an undisclosed semantic breaking change that could expose sensitive intermediate data (tool calls, agent reasoning) and break consumers expecting a single-message result. Existing tests only assert isinstance—they never verify that output.messages contains the expected content. If intentional, it needs explicit documentation and test coverage; if incidental, revert to wrapping only [final_answer].

Suggestions

  • end_with_agent_executor_response silently converts a None full_conversation into an empty OrchestrationOutput via or []. Consider raising an error or logging a warning if full_conversation is None, since an empty output is likely a bug.
  • Add a test for Magentic verifying that output.messages contains conversation history entries beyond the final answer (e.g., assert len(output.messages) > 1 and that earlier agent turns are included) to document the new behavior.
  • The sync callback test comment at line 121 of test_concurrent.py says 'should run via asyncio.to_thread' but the implementation no longer does this. Update the comment and add a test that verifies sync callbacks don't block the event loop.
  • Add a test for _CallbackAggregator when a callback returns None to document the new behavior (TypeError) vs the old behavior (no-op), or add a runtime guard with a clear error message.
  • The _CallbackAggregator API was silently narrowed from accepting callbacks with an optional ctx parameter (to yield arbitrary output types) to requiring list[Message] → OrchestrationOutput, removing a documented power-user extension point. If intentional, call this out as a separate API break with a rationale; if unintentional, preserve the ctx-parameter path.
  • Consider renaming OrchestrationOutput to something more distinct from AgentOrchestrationOutput (e.g., OrchestrationResult or WorkflowResult). The two names are visually similar but represent completely different things—one is the manager agent's routing JSON, the other is the final conversation payload.
  • clean_conversation_for_handoff was previously public API (exported from init.py and all). Renaming it to filter_tool_contents and removing both names from the public API is a silent breaking change. If intentional, document in a migration guide; if not, keep the old name as a deprecated alias.

Automated review by moonbox3's agents

Comment on lines 145 to +148

Accepts either an async or sync callback with one of the signatures:
- (results: list[AgentExecutorResponse]) -> Any | None
- (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
Accepts either an async or sync callback with the following signature:
- (results: list[AgentExecutorResponse]) -> list[Message]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: sync callbacks are now called directly on the event loop thread instead of via asyncio.to_thread, blocking all concurrent async tasks. The asyncio import was removed entirely. Restore the asyncio.to_thread path for non-awaitable callbacks.

Suggested change
Accepts either an async or sync callback with one of the signatures:
- (results: list[AgentExecutorResponse]) -> Any | None
- (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None
Accepts either an async or sync callback with the following signature:
- (results: list[AgentExecutorResponse]) -> list[Message]
if inspect.iscoroutinefunction(self._callback):
ret = await self._callback(results)
else:
import asyncio
ret = await asyncio.to_thread(self._callback, results)
await ctx.yield_output(OrchestrationOutput(messages=list(ret)))

output.extend(assistant_replies)

await ctx.yield_output(output)
await ctx.yield_output(OrchestrationOutput(messages=output))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale docstring: still says 'Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop' but the implementation no longer does this. Update the docstring to match the actual behavior, or (better) restore the asyncio.to_thread wrapping.

- (results: list[AgentExecutorResponse]) -> list[Message]

The returned list[Message] is automatically wrapped in an OrchestrationOutput.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the callback returns None at runtime (e.g., a function with no return statement), list(ret) raises TypeError. The old code had an explicit None guard. Add validation to produce a clear error message.

Suggested change
if ret is None:
raise TypeError("Aggregator callback must return list[Message], got None")
await ctx.yield_output(OrchestrationOutput(messages=list(ret)))

from ._orchestration_request_info import AgentRequestInfoResponse
from ._orchestration_state import OrchestrationState
from ._orchestrator_helpers import clean_conversation_for_handoff, create_completion_message
from ._orchestration_shared import OrchestrationState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrchestrationOutput is the primary type SDK consumers need for isinstance checks and type hints on workflow results, but it is never imported here. Add it to the imports.

Suggested change
from ._orchestration_shared import OrchestrationState
from ._orchestration_shared import OrchestrationOutput, OrchestrationState

@@ -105,6 +104,4 @@
"StandardMagenticManager",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrchestrationOutput is not included in all. Add it so users can import it from the public package path rather than the private _orchestration_shared module.

Suggested change
"StandardMagenticManager",
"OrchestrationOutput",
"StandardMagenticManager",
"TerminationCondition",
"__version__",


# Emit a completed event for the workflow
await ctx.yield_output([final_answer])
await ctx.yield_output(OrchestrationOutput(messages=[*self._magentic_context.chat_history, final_answer]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent behavioral change: previously yielded [final_answer] (single message), now yields the entire conversation history plus the final answer. This changes what downstream consumers receive and may expose sensitive intermediate data. Wrapping in OrchestrationOutput should not change what is wrapped. If aligning Magentic with other orchestrations that return full history is intentional, it should be a separate explicit decision with documentation.

Suggested change
await ctx.yield_output(OrchestrationOutput(messages=[*self._magentic_context.chat_history, final_answer]))
await ctx.yield_output(OrchestrationOutput(messages=[final_answer]))

Comment on lines 285 to +286
assert output is not None
assert isinstance(output, list)
assert all(isinstance(msg, Message) for msg in output)
assert isinstance(output, OrchestrationOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions only verify the output type, not the behavioral change. _prepare_final_answer now yields the full chat history plus the final answer (previously just [final_answer]). Add an assertion that output.messages contains the expected number of messages and includes conversation history, e.g. assert len(output.messages) > 1.

# Sync callback with ctx parameter (should run via asyncio.to_thread)
def summarize_sync(results: list[AgentExecutorResponse], _ctx: WorkflowContext[Any]) -> str: # type: ignore[unused-argument]
# Sync callback (should run via asyncio.to_thread)
def summarize_sync(results: list[AgentExecutorResponse]) -> list[Message]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment: the implementation no longer uses asyncio.to_thread for sync callbacks—they are called directly on the event loop. This test doesn't detect the behavioral regression. Update the comment and consider adding a test that verifies sync callbacks don't block the loop.

Suggested change
def summarize_sync(results: list[AgentExecutorResponse]) -> list[Message]:
# Sync callback (called directly; no longer wrapped in asyncio.to_thread)

Comment on lines 8 to +18
from agent_framework._types import Message


@dataclass
class OrchestrationOutput:
"""Standardized output format for orchestrations.

Attributes:
messages: List of messages representing the full conversation of the orchestration, including all agent turns.
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrchestrationOutput is defined here but never added to the package's init.py or all. Every test imports it from this private module. As the canonical output type for all orchestrations, it should be publicly exported.

Copy link
Contributor

@moonbox3 moonbox3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A larger question: what future fields justify OrchestrationOutput as a single-field wrapper over list[Message]? If we're planning to add metadata, termination_reason, duration, etc., the wrapper earns its keep. If it's purely for
isinstance discrimination in the event stream, it adds .messages ceremony everywhere. Is there a lighter way to solve that (for example, a distinct event type for workflow completion)?

@TaoChenOSU
Copy link
Contributor Author

TaoChenOSU commented Mar 17, 2026

A larger question: what future fields justify OrchestrationOutput as a single-field wrapper over list[Message]? If we're planning to add metadata, termination_reason, duration, etc., the wrapper earns its keep. If it's purely for isinstance discrimination in the event stream, it adds .messages ceremony everywhere. Is there a lighter way to solve that (for example, a distinct event type for workflow completion)?

I also agree.

Creating a new distinct event type for workflow completion is a good idea. However, that event type will no longer be of type output. I think that's probably ok. A potential issue is that when an orchestration is wrapped as a sub workflow, it will only produce incremental updates that downstream executors have to put them back together. Let's discuss this option today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python workflows Related to Workflows in agent-framework

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants