-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[BREAKING] Python: Update orchestration return types #4733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -61,8 +61,7 @@ | |||||||||||
| StandardMagenticManager, | ||||||||||||
| ) | ||||||||||||
| from ._orchestration_request_info import AgentRequestInfoResponse | ||||||||||||
| from ._orchestration_state import OrchestrationState | ||||||||||||
| from ._orchestrator_helpers import clean_conversation_for_handoff, create_completion_message | ||||||||||||
| from ._orchestration_shared import OrchestrationState | ||||||||||||
| from ._sequential import SequentialBuilder | ||||||||||||
|
|
||||||||||||
| __all__ = [ | ||||||||||||
|
|
@@ -105,6 +104,4 @@ | |||||||||||
| "StandardMagenticManager", | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OrchestrationOutput is not included in all. Add it so users can import it from the public package path rather than the private _orchestration_shared module.
Suggested change
|
||||||||||||
| "TerminationCondition", | ||||||||||||
| "__version__", | ||||||||||||
| "clean_conversation_for_handoff", | ||||||||||||
| "create_completion_message", | ||||||||||||
| ] | ||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,10 +1,9 @@ | ||||||||||||||||||||||||||||
| # Copyright (c) Microsoft. All rights reserved. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||
| import inspect | ||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||
| from collections.abc import Callable, Sequence | ||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||
| from typing import Any, Awaitable | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from agent_framework import Message, SupportsAgentRun | ||||||||||||||||||||||||||||
| from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse | ||||||||||||||||||||||||||||
|
|
@@ -18,6 +17,7 @@ | |||||||||||||||||||||||||||
| from typing_extensions import Never | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from ._orchestration_request_info import AgentApprovalExecutor | ||||||||||||||||||||||||||||
| from ._orchestration_shared import OrchestrationOutput | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -32,13 +32,11 @@ | |||||||||||||||||||||||||||
| - Participants can be provided as SupportsAgentRun or Executor instances via `participants=[...]`. | ||||||||||||||||||||||||||||
| - A custom aggregator can be provided as: | ||||||||||||||||||||||||||||
| - an Executor instance (it should handle list[AgentExecutorResponse], | ||||||||||||||||||||||||||||
| yield output), or | ||||||||||||||||||||||||||||
| yield OrchestrationOutput), or | ||||||||||||||||||||||||||||
| - a callback function with signature: | ||||||||||||||||||||||||||||
| def cb(results: list[AgentExecutorResponse]) -> Any | None | ||||||||||||||||||||||||||||
| def cb(results: list[AgentExecutorResponse], ctx: WorkflowContext) -> Any | None | ||||||||||||||||||||||||||||
| The callback is wrapped in _CallbackAggregator. | ||||||||||||||||||||||||||||
| If the callback returns a non-None value, _CallbackAggregator yields that as output. | ||||||||||||||||||||||||||||
| If it returns None, the callback may have already yielded an output via ctx, so no further action is taken. | ||||||||||||||||||||||||||||
| def cb(results: list[AgentExecutorResponse]) -> list[Message] | ||||||||||||||||||||||||||||
| The callback is wrapped in _CallbackAggregator, which wraps the returned | ||||||||||||||||||||||||||||
| list[Message] in an OrchestrationOutput and yields it. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -82,7 +80,9 @@ class _AggregateAgentConversations(Executor): | |||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @handler | ||||||||||||||||||||||||||||
| async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, list[Message]]) -> None: | ||||||||||||||||||||||||||||
| async def aggregate( | ||||||||||||||||||||||||||||
| self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, OrchestrationOutput] | ||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||
| if not results: | ||||||||||||||||||||||||||||
| logger.error("Concurrent aggregator received empty results list") | ||||||||||||||||||||||||||||
| raise ValueError("Aggregation failed: no results provided") | ||||||||||||||||||||||||||||
|
|
@@ -137,47 +137,43 @@ def _is_role(msg: Any, role: str) -> bool: | |||||||||||||||||||||||||||
| logger.warning("No user prompt found in any conversation; emitting assistants only") | ||||||||||||||||||||||||||||
| output.extend(assistant_replies) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await ctx.yield_output(output) | ||||||||||||||||||||||||||||
| await ctx.yield_output(OrchestrationOutput(messages=output)) | ||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stale docstring: still says 'Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop' but the implementation no longer does this. Update the docstring to match the actual behavior, or (better) restore the asyncio.to_thread wrapping. |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| class _CallbackAggregator(Executor): | ||||||||||||||||||||||||||||
| """Wraps a Python callback as an aggregator. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Accepts either an async or sync callback with one of the signatures: | ||||||||||||||||||||||||||||
| - (results: list[AgentExecutorResponse]) -> Any | None | ||||||||||||||||||||||||||||
| - (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None | ||||||||||||||||||||||||||||
| Accepts either an async or sync callback with the following signature: | ||||||||||||||||||||||||||||
| - (results: list[AgentExecutorResponse]) -> list[Message] | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
Comment on lines
145
to
+148
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: sync callbacks are now called directly on the event loop thread instead of via asyncio.to_thread, blocking all concurrent async tasks. The asyncio import was removed entirely. Restore the asyncio.to_thread path for non-awaitable callbacks.
Suggested change
|
||||||||||||||||||||||||||||
| The returned list[Message] is automatically wrapped in an OrchestrationOutput. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the callback returns None at runtime (e.g., a function with no return statement),
Suggested change
|
||||||||||||||||||||||||||||
| Notes: | ||||||||||||||||||||||||||||
| - Async callbacks are awaited directly. | ||||||||||||||||||||||||||||
| - Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop. | ||||||||||||||||||||||||||||
| - If the callback returns a non-None value, it is yielded as an output. | ||||||||||||||||||||||||||||
| - If the callback returns a non-None value, it is wrapped in OrchestrationOutput and yielded. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def __init__(self, callback: Callable[..., Any], id: str | None = None) -> None: | ||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||
| callback: Callable[[list[AgentExecutorResponse]], Awaitable[list[Message]] | list[Message]], | ||||||||||||||||||||||||||||
| id: str | None = None, | ||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||
| derived_id = getattr(callback, "__name__", "") or "" | ||||||||||||||||||||||||||||
| if not derived_id or derived_id == "<lambda>": | ||||||||||||||||||||||||||||
| derived_id = f"{type(self).__name__}_unnamed" | ||||||||||||||||||||||||||||
| super().__init__(id or derived_id) | ||||||||||||||||||||||||||||
| self._callback = callback | ||||||||||||||||||||||||||||
| self._param_count = len(inspect.signature(callback).parameters) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| @handler | ||||||||||||||||||||||||||||
| async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, Any]) -> None: | ||||||||||||||||||||||||||||
| # Call according to provided signature, always non-blocking for sync callbacks | ||||||||||||||||||||||||||||
| if self._param_count >= 2: | ||||||||||||||||||||||||||||
| if inspect.iscoroutinefunction(self._callback): | ||||||||||||||||||||||||||||
| ret = await self._callback(results, ctx) # type: ignore[misc] | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| ret = await asyncio.to_thread(self._callback, results, ctx) | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| if inspect.iscoroutinefunction(self._callback): | ||||||||||||||||||||||||||||
| ret = await self._callback(results) # type: ignore[misc] | ||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||
| ret = await asyncio.to_thread(self._callback, results) | ||||||||||||||||||||||||||||
| async def aggregate( | ||||||||||||||||||||||||||||
| self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, OrchestrationOutput] | ||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||
| ret = self._callback(results) | ||||||||||||||||||||||||||||
| if inspect.isawaitable(ret): | ||||||||||||||||||||||||||||
| ret = await ret | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # If the callback returned a value, finalize the workflow with it | ||||||||||||||||||||||||||||
| if ret is not None: | ||||||||||||||||||||||||||||
| await ctx.yield_output(ret) | ||||||||||||||||||||||||||||
| await ctx.yield_output(OrchestrationOutput(messages=list(ret))) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| class ConcurrentBuilder: | ||||||||||||||||||||||||||||
|
|
@@ -193,7 +189,7 @@ class ConcurrentBuilder: | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from agent_framework_orchestrations import ConcurrentBuilder | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Minimal: use default aggregator (returns list[Message]) | ||||||||||||||||||||||||||||
| # Minimal: use default aggregator (returns OrchestrationOutput) | ||||||||||||||||||||||||||||
| workflow = ConcurrentBuilder(participants=[agent1, agent2, agent3]).build() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -265,17 +261,14 @@ def _set_participants(self, participants: Sequence[SupportsAgentRun | Executor]) | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def with_aggregator( | ||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||
| aggregator: Executor | ||||||||||||||||||||||||||||
| | Callable[[list[AgentExecutorResponse]], Any] | ||||||||||||||||||||||||||||
| | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any], | ||||||||||||||||||||||||||||
| aggregator: Executor | Callable[[list[AgentExecutorResponse]], Awaitable[list[Message]] | list[Message]], | ||||||||||||||||||||||||||||
| ) -> "ConcurrentBuilder": | ||||||||||||||||||||||||||||
| r"""Override the default aggregator with an executor or a callback. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| - Executor: must handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)` | ||||||||||||||||||||||||||||
| - Callback: sync or async callable with one of the signatures: | ||||||||||||||||||||||||||||
| `(results: list[AgentExecutorResponse]) -> Any | None` or | ||||||||||||||||||||||||||||
| `(results: list[AgentExecutorResponse], ctx: WorkflowContext) -> Any | None`. | ||||||||||||||||||||||||||||
| If the callback returns a non-None value, it becomes the workflow's output. | ||||||||||||||||||||||||||||
| - Callback: sync or async callable with the signature: | ||||||||||||||||||||||||||||
| `(results: list[AgentExecutorResponse]) -> list[Message]`. | ||||||||||||||||||||||||||||
| The returned list[Message] is automatically wrapped in an OrchestrationOutput. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||
| aggregator: Executor instance, or callback function | ||||||||||||||||||||||||||||
|
|
@@ -287,23 +280,15 @@ def with_aggregator( | |||||||||||||||||||||||||||
| class CustomAggregator(Executor): | ||||||||||||||||||||||||||||
| @handler | ||||||||||||||||||||||||||||
| async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext) -> None: | ||||||||||||||||||||||||||||
| await ctx.yield_output(" | ".join(r.agent_response.messages[-1].text for r in results)) | ||||||||||||||||||||||||||||
| await ctx.yield_output(OrchestrationOutput(messages=[...])) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| wf = ConcurrentBuilder(participants=[a1, a2, a3]).with_aggregator(CustomAggregator()).build() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Callback-based aggregator (string result) | ||||||||||||||||||||||||||||
| async def summarize(results: list[AgentExecutorResponse]) -> str: | ||||||||||||||||||||||||||||
| return " | ".join(r.agent_response.messages[-1].text for r in results) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| wf = ConcurrentBuilder(participants=[a1, a2, a3]).with_aggregator(summarize).build() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Callback-based aggregator (yield result) | ||||||||||||||||||||||||||||
| async def summarize(results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: | ||||||||||||||||||||||||||||
| await ctx.yield_output(" | ".join(r.agent_response.messages[-1].text for r in results)) | ||||||||||||||||||||||||||||
| # Callback-based aggregator (returns list[Message], wrapped in OrchestrationOutput) | ||||||||||||||||||||||||||||
| async def summarize(results: list[AgentExecutorResponse]) -> list[Message]: | ||||||||||||||||||||||||||||
| return [r.agent_response.messages[-1] for r in results] | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| wf = ConcurrentBuilder(participants=[a1, a2, a3]).with_aggregator(summarize).build() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OrchestrationOutput is the primary type SDK consumers need for isinstance checks and type hints on workflow results, but it is never imported here. Add it to the imports.