Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 149 additions & 30 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages
from anthropic.lib.streaming._messages import MessageStreamManager

from anthropic.types import (
MessageStartEvent,
Expand All @@ -59,7 +60,15 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent
from anthropic import AsyncStream
from anthropic.types import (
RawMessageStreamEvent,
MessageParam,
ModelParam,
TextBlockParam,
ToolUnionParam,
MessageStream,
)


class _RecordedUsage:
Expand All @@ -84,6 +93,11 @@ def setup_once() -> None:
Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

Messages.stream = _wrap_message_stream(Messages.stream)
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
MessageStreamManager.__enter__
)


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -253,27 +267,32 @@ def _transform_system_instructions(
]


def _set_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
def _set_common_input_data(
span: "Span",
integration: "AnthropicIntegration",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
temperature: "Optional[float]",
top_k: "Optional[int]",
top_p: "Optional[float]",
tools: "Optional[Iterable[ToolUnionParam]]",
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
system_instructions: "Union[str, Iterable[TextBlockParam]]" = kwargs.get("system") # type: ignore
messages = kwargs.get("messages")
if (
messages is not None
and len(messages) > 0
and len(messages) > 0 # type: ignore
and should_send_default_pii()
and integration.include_prompts
):
if isinstance(system_instructions, str) or isinstance(
system_instructions, Iterable
):
if isinstance(system, str) or isinstance(system, Iterable):
span.set_data(
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
json.dumps(_transform_system_instructions(system_instructions)),
json.dumps(_transform_system_instructions(system)),
)

normalized_messages = []
Expand Down Expand Up @@ -329,32 +348,48 @@ def _set_input_data(
span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
)

if max_tokens is not None and _is_given(max_tokens):
span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
if model is not None and _is_given(model):
span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
if temperature is not None and _is_given(temperature):
span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
if top_k is not None and _is_given(top_k):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
if top_p is not None and _is_given(top_p):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)

if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _set_create_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))

kwargs_keys_to_attributes = {
"max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
"model": SPANDATA.GEN_AI_REQUEST_MODEL,
"temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
"top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
"top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}
for key, attribute in kwargs_keys_to_attributes.items():
value = kwargs.get(key)

if value is not None and _is_given(value):
span.set_data(attribute, value)

# Input attributes: Tools
tools = kwargs.get("tools")
if tools is not None and _is_given(tools) and len(tools) > 0:
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
_set_common_input_data(
span=span,
integration=integration,
max_tokens=kwargs.get("max_tokens"), # type: ignore
messages=kwargs.get("messages"), # type: ignore
model=kwargs.get("model"),
system=kwargs.get("system"),
temperature=kwargs.get("temperature"),
top_k=kwargs.get("top_k"),
top_p=kwargs.get("top_p"),
tools=kwargs.get("tools"),
)


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
Expand Down Expand Up @@ -546,7 +581,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
)
span.__enter__()

_set_input_data(span, kwargs, integration)
_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs

Expand Down Expand Up @@ -674,6 +709,90 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
return _sentry_patched_create_async


def _wrap_message_stream(f: "Any") -> "Any":
"""
Attaches user-provided arguments to the returned context manager.
The attributes are set on AI Client Spans in the patch for the context manager.
"""

@wraps(f)
def _sentry_patched_stream(*args: "Any", **kwargs: "Any") -> "MessageStreamManager":
stream_manager = f(*args, **kwargs)

stream_manager._max_tokens = kwargs.get("max_tokens")
stream_manager._messages = kwargs.get("messages")
stream_manager._model = kwargs.get("model")
stream_manager._system = kwargs.get("system")
stream_manager._temperature = kwargs.get("temperature")
stream_manager._top_k = kwargs.get("top_k")
stream_manager._top_p = kwargs.get("top_p")
stream_manager._tools = kwargs.get("tools")

return stream_manager

return _sentry_patched_stream


def _wrap_message_stream_manager_enter(f: "Any") -> "Any":
"""
Creates and manages AI Client Spans.
"""

@wraps(f)
def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
stream = f(self)
if not hasattr(self, "_max_tokens"):
return stream

integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

if integration is None:
return stream

if self._messages is None:
return stream

try:
iter(self._messages)
except TypeError:
return stream

model = self._model
if model is None:
model = ""

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
_set_common_input_data(
span=span,
integration=integration,
max_tokens=self._max_tokens,
messages=self._messages,
model=model,
system=self._system,
temperature=self._temperature,
top_k=self._top_k,
top_p=self._top_p,
tools=self._tools,
)

stream._iterator = _wrap_synchronous_message_iterator(
iterator=stream._iterator,
span=span,
integration=integration,
)

return stream

return _sentry_patched_enter


def _is_given(obj: "Any") -> bool:
"""
Check for givenness safely across different anthropic versions.
Expand Down
62 changes: 49 additions & 13 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,29 @@
from collections.abc import Iterator

try:
from anyio import create_memory_object_stream, create_task_group, EndOfStream
from mcp.types import (
JSONRPCMessage,
JSONRPCNotification,
JSONRPCRequest,
)
from mcp.shared.message import SessionMessage
from httpx import (
ASGITransport,
Request as HttpxRequest,
Response as HttpxResponse,
AsyncByteStream,
AsyncClient,
)
except ImportError:
ASGITransport = None
HttpxRequest = None
HttpxResponse = None
AsyncByteStream = None
AsyncClient = None


try:
from anyio import create_memory_object_stream, create_task_group, EndOfStream
from mcp.types import (
JSONRPCMessage,
JSONRPCNotification,
JSONRPCRequest,
)
from mcp.shared.message import SessionMessage
except ImportError:
create_memory_object_stream = None
create_task_group = None
Expand All @@ -81,12 +90,6 @@
JSONRPCRequest = None
SessionMessage = None

ASGITransport = None
HttpxRequest = None
HttpxResponse = None
AsyncByteStream = None
AsyncClient = None


SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json"

Expand Down Expand Up @@ -1013,6 +1016,39 @@ async def inner(values):
return inner


@pytest.fixture
def server_side_event_chunks():
def inner(events):
for event in events:
payload = event.model_dump()
chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n"
yield chunk.encode("utf-8")

return inner


@pytest.fixture
def get_model_response():
def inner(response_content, serialize_pydantic=False):
model_request = HttpxRequest(
"POST",
"/responses",
)

if serialize_pydantic:
response_content = json.dumps(response_content.model_dump()).encode("utf-8")

response = HttpxResponse(
200,
request=model_request,
content=response_content,
)

return response

return inner


class MockServerRequestHandler(BaseHTTPRequestHandler):
def do_GET(self): # noqa: N802
# Process an HTTP GET request and return a response.
Expand Down
Loading
Loading