Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,14 @@ async def _run_loop(

await self._append_messages(*current_messages)

# Check if the model supports native structured output
model_config = self.model.get_config()
native_mode = isinstance(model_config, dict) and model_config.get("structured_output_mode") == "native"

structured_output_context = StructuredOutputContext(
structured_output_model or self._default_structured_output_model,
structured_output_prompt=structured_output_prompt or self._structured_output_prompt,
native_mode=native_mode,
)

# Execute the event loop cycle with retry logic for context limits
Expand Down Expand Up @@ -950,7 +955,7 @@ async def _execute_event_loop_cycle(
# Add `Agent` to invocation_state to keep backwards-compatibility
invocation_state["agent"] = self

if structured_output_context:
if structured_output_context and not structured_output_context.native_mode:
structured_output_context.register_tool(self.tool_registry)

try:
Expand Down
67 changes: 52 additions & 15 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,62 @@ async def event_loop_cycle(
# End the cycle and return results
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)

# Force structured output tool call if LLM didn't use it automatically
# Handle structured output when model returns end_turn
if structured_output_context.is_enabled and stop_reason == "end_turn":
if structured_output_context.force_attempted:
if structured_output_context.native_mode:
# Native mode: use model's native structured output for final formatting.
# The agent loop ran normally with tools and thinking; only this final
# step uses native structured output via model.structured_output().
logger.debug("using native structured output for final formatting")
# Append a user message so the conversation doesn't end with an assistant
# message (some models like Opus 4.6 don't support assistant prefill).
await agent._append_messages(
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
)
native_result = None
async for event in agent.model.structured_output(
structured_output_context.structured_output_model,
agent.messages,
system_prompt=agent.system_prompt,
):
if "output" in event:
native_result = event["output"]

if native_result is not None:
yield StructuredOutputEvent(structured_output=native_result)
tracer.end_event_loop_cycle_span(cycle_span, message)
yield EventLoopStopEvent(
stop_reason,
message,
agent.event_loop_metrics,
invocation_state["request_state"],
structured_output=native_result,
)
return
raise StructuredOutputException(
"The model failed to invoke the structured output tool even after it was forced."
"Native structured output mode: model did not return structured output."
)
else:
# Tool mode: force the model to call the structured output tool
if structured_output_context.force_attempted:
raise StructuredOutputException(
"The model failed to invoke the structured output tool even after it was forced."
)
structured_output_context.set_forced_mode()
logger.debug("Forcing structured output tool")
await agent._append_messages(
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
)
structured_output_context.set_forced_mode()
logger.debug("Forcing structured output tool")
await agent._append_messages(
{"role": "user", "content": [{"text": structured_output_context.structured_output_prompt}]}
)

tracer.end_event_loop_cycle_span(cycle_span, message)
events = recurse_event_loop(
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
)
async for typed_event in events:
yield typed_event
return
tracer.end_event_loop_cycle_span(cycle_span, message)
events = recurse_event_loop(
agent=agent,
invocation_state=invocation_state,
structured_output_context=structured_output_context,
)
async for typed_event in events:
yield typed_event
return

tracer.end_event_loop_cycle_span(cycle_span, message)
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
Expand Down
84 changes: 81 additions & 3 deletions src/strands/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from .._exception_notes import add_exception_note
from ..event_loop import streaming
from ..tools import convert_pydantic_to_tool_spec
from ..tools import convert_pydantic_to_json_schema, convert_pydantic_to_tool_spec
from ..tools._tool_helpers import noop_tool
from ..types.content import ContentBlock, Messages, SystemContentBlock
from ..types.exceptions import (
Expand Down Expand Up @@ -98,6 +98,9 @@ class BedrockConfig(TypedDict, total=False):
Please check https://docs.aws.amazon.com/bedrock/latest/userguide/service-tiers-inference.html for
supported service tiers, models, and regions
stop_sequences: List of sequences that will stop generation when encountered
structured_output_mode: Mode for structured output. "tool" (default) uses tool-based approach,
"native" uses Bedrock's outputConfig.textFormat for schema-constrained responses.
Native mode requires a model that supports structured output.
streaming: Flag to enable/disable streaming. Defaults to True.
temperature: Controls randomness in generation (higher = more random)
top_p: Controls diversity via nucleus sampling (alternative to temperature)
Expand All @@ -123,6 +126,7 @@ class BedrockConfig(TypedDict, total=False):
include_tool_result_status: Literal["auto"] | bool | None
service_tier: str | None
stop_sequences: list[str] | None
structured_output_mode: Literal["tool", "native"] | None
streaming: bool | None
temperature: float | None
top_p: float | None
Expand Down Expand Up @@ -218,6 +222,7 @@ def _format_request(
tool_specs: list[ToolSpec] | None = None,
system_prompt_content: list[SystemContentBlock] | None = None,
tool_choice: ToolChoice | None = None,
output_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Format a Bedrock converse stream request.

Expand All @@ -226,6 +231,7 @@ def _format_request(
tool_specs: List of tool specifications to make available to the model.
tool_choice: Selection strategy for tool invocation.
system_prompt_content: System prompt content blocks to provide context to the model.
output_config: Output configuration for structured output (JSON schema).

Returns:
A Bedrock converse stream request.
Expand All @@ -251,6 +257,20 @@ def _format_request(
"messages": self._format_bedrock_messages(messages),
"system": system_blocks,
**({"serviceTier": {"type": self.config["service_tier"]}} if self.config.get("service_tier") else {}),
**(
{
"outputConfig": {
"textFormat": {
"type": "json_schema",
"structure": {
"jsonSchema": output_config,
},
},
}
}
if output_config
else {}
),
**(
{
"toolConfig": {
Expand Down Expand Up @@ -747,6 +767,7 @@ async def stream(
*,
tool_choice: ToolChoice | None = None,
system_prompt_content: list[SystemContentBlock] | None = None,
output_config: dict[str, Any] | None = None,
**kwargs: Any,
) -> AsyncGenerator[StreamEvent, None]:
"""Stream conversation with the Bedrock model.
Expand All @@ -760,6 +781,7 @@ async def stream(
system_prompt: System prompt to provide context to the model.
tool_choice: Selection strategy for tool invocation.
system_prompt_content: System prompt content blocks to provide context to the model.
output_config: Output configuration for structured output (JSON schema).
**kwargs: Additional keyword arguments for future extensibility.

Yields:
Expand All @@ -782,7 +804,9 @@ def callback(event: StreamEvent | None = None) -> None:
if system_prompt and system_prompt_content is None:
system_prompt_content = [{"text": system_prompt}]

thread = asyncio.to_thread(self._stream, callback, messages, tool_specs, system_prompt_content, tool_choice)
thread = asyncio.to_thread(
self._stream, callback, messages, tool_specs, system_prompt_content, tool_choice, output_config
)
task = asyncio.create_task(thread)

while True:
Expand All @@ -801,6 +825,7 @@ def _stream(
tool_specs: list[ToolSpec] | None = None,
system_prompt_content: list[SystemContentBlock] | None = None,
tool_choice: ToolChoice | None = None,
output_config: dict[str, Any] | None = None,
) -> None:
"""Stream conversation with the Bedrock model.

Expand All @@ -813,14 +838,15 @@ def _stream(
tool_specs: List of tool specifications to make available to the model.
system_prompt_content: System prompt content blocks to provide context to the model.
tool_choice: Selection strategy for tool invocation.
output_config: Output configuration for structured output (JSON schema).

Raises:
ContextWindowOverflowException: If the input exceeds the model's context window.
ModelThrottledException: If the model service is throttling requests.
"""
try:
logger.debug("formatting request")
request = self._format_request(messages, tool_specs, system_prompt_content, tool_choice)
request = self._format_request(messages, tool_specs, system_prompt_content, tool_choice, output_config)
logger.debug("request=<%s>", request)

logger.debug("invoking model")
Expand Down Expand Up @@ -1032,6 +1058,10 @@ async def structured_output(
) -> AsyncGenerator[dict[str, T | Any], None]:
"""Get structured output from the model.

Supports two modes controlled by `structured_output_mode` config:
- "tool" (default): Converts the Pydantic model to a tool spec and forces tool use.
- "native": Uses Bedrock's outputConfig.textFormat with JSON schema for guaranteed schema compliance.

Args:
output_model: The output model to use for the agent.
prompt: The prompt messages to use for the agent.
Expand All @@ -1041,6 +1071,21 @@ async def structured_output(
Yields:
Model events with the last being the structured output.
"""
if self.config.get("structured_output_mode") == "native":
async for event in self._structured_output_native(output_model, prompt, system_prompt, **kwargs):
yield event
else:
async for event in self._structured_output_tool(output_model, prompt, system_prompt, **kwargs):
yield event

async def _structured_output_tool(
self,
output_model: type[T],
prompt: Messages,
system_prompt: str | None = None,
**kwargs: Any,
) -> AsyncGenerator[dict[str, T | Any], None]:
"""Structured output using tool-based approach."""
tool_spec = convert_pydantic_to_tool_spec(output_model)

response = self.stream(
Expand Down Expand Up @@ -1073,6 +1118,39 @@ async def structured_output(

yield {"output": output_model(**output_response)}

async def _structured_output_native(
self,
output_model: type[T],
prompt: Messages,
system_prompt: str | None = None,
**kwargs: Any,
) -> AsyncGenerator[dict[str, T | Any], None]:
"""Structured output using Bedrock's native outputConfig.textFormat."""
output_config = convert_pydantic_to_json_schema(output_model)

response = self.stream(
messages=prompt,
system_prompt=system_prompt,
output_config=output_config,
**kwargs,
)
async for event in streaming.process_stream(response):
yield event

_, messages, _, _ = event["stop"]

content = messages["content"]
text_content: str | None = None
for block in content:
if "text" in block and block["text"].strip():
text_content = block["text"]

if text_content is None:
raise ValueError("No text content found in the Bedrock response for native structured output.")

output_response = json.loads(text_content)
yield {"output": output_model(**output_response)}

@staticmethod
def _get_default_model_with_warning(region_name: str, model_config: BedrockConfig | None = None) -> str:
"""Get the default Bedrock modelId based on region.
Expand Down
3 changes: 2 additions & 1 deletion src/strands/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

from .decorator import tool
from .structured_output import convert_pydantic_to_tool_spec
from .structured_output import convert_pydantic_to_json_schema, convert_pydantic_to_tool_spec
from .tool_provider import ToolProvider
from .tools import InvalidToolUseNameException, PythonAgentTool, normalize_schema, normalize_tool_spec

Expand All @@ -14,6 +14,7 @@
"InvalidToolUseNameException",
"normalize_schema",
"normalize_tool_spec",
"convert_pydantic_to_json_schema",
"convert_pydantic_to_tool_spec",
"ToolProvider",
]
4 changes: 2 additions & 2 deletions src/strands/tools/structured_output/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Structured output tools for the Strands Agents framework."""

from ._structured_output_context import DEFAULT_STRUCTURED_OUTPUT_PROMPT
from .structured_output_utils import convert_pydantic_to_tool_spec
from .structured_output_utils import convert_pydantic_to_json_schema, convert_pydantic_to_tool_spec

__all__ = ["convert_pydantic_to_tool_spec", "DEFAULT_STRUCTURED_OUTPUT_PROMPT"]
__all__ = ["convert_pydantic_to_json_schema", "convert_pydantic_to_tool_spec", "DEFAULT_STRUCTURED_OUTPUT_PROMPT"]
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,30 @@ def __init__(
self,
structured_output_model: type[BaseModel] | None = None,
structured_output_prompt: str | None = None,
native_mode: bool = False,
):
"""Initialize a new structured output context.

Args:
structured_output_model: Optional Pydantic model type for structured output.
structured_output_prompt: Optional custom prompt message to use when forcing structured output.
Defaults to "You must format the previous response as structured output."
native_mode: If True, use the model's native structured output for the final formatting step
instead of forcing tool use. The agent loop runs normally with tools and thinking;
only the final response formatting uses native structured output.
"""
self.results: dict[str, BaseModel] = {}
self.structured_output_model: type[BaseModel] | None = structured_output_model
self.structured_output_tool: StructuredOutputTool | None = None
self.native_mode: bool = native_mode
self.forced_mode: bool = False
self.force_attempted: bool = False
self.tool_choice: ToolChoice | None = None
self.stop_loop: bool = False
self.expected_tool_name: str | None = None
self.structured_output_prompt: str = structured_output_prompt or DEFAULT_STRUCTURED_OUTPUT_PROMPT

if structured_output_model:
if structured_output_model and not native_mode:
self.structured_output_tool = StructuredOutputTool(structured_output_model)
self.expected_tool_name = self.structured_output_tool.tool_name

Expand Down
Loading