Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f69fc61
Add distributed SSE event stream store
MackinnonBuck Dec 16, 2025
e344927
Small fixes and test improvements
MackinnonBuck Jan 6, 2026
f70bd12
Adjust for latest changes to `ISseEventStreamStore`
MackinnonBuck Jan 9, 2026
984df24
Clean up tests, throw on expired cache entries
MackinnonBuck Jan 10, 2026
80b2cb4
Add logging
MackinnonBuck Jan 15, 2026
bcbf247
Store retry interval
MackinnonBuck Jan 15, 2026
66083ea
Use span-based APIs for event ID parsing
MackinnonBuck Jan 15, 2026
17867ee
Add shorter timeout on `Client_CanResumeUnsolicitedMessageStream_Afte…
MackinnonBuck Jan 15, 2026
f59c0e0
Use longer timeout in test
MackinnonBuck Jan 16, 2026
1fae1f6
Use versioned cache keys
MackinnonBuck Jan 16, 2026
e2d3ba0
Fix flaky test
MackinnonBuck Jan 16, 2026
114feac
Amend test fix
MackinnonBuck Jan 16, 2026
76c064a
Update log message
MackinnonBuck Jan 16, 2026
0a0bc54
Lengthen unusually short test timeouts
MackinnonBuck Jan 16, 2026
3238d65
Remove redundant CTS
MackinnonBuck Jan 16, 2026
d3f70cf
Delay flushing the unsolicited message stream
MackinnonBuck Jan 20, 2026
3403355
Lengthen timeouts
MackinnonBuck Jan 20, 2026
5ea239d
Allow any OCE in test
MackinnonBuck Jan 20, 2026
71a5758
Merge remote-tracking branch 'origin/main' into mbuck/distributed-sse…
MackinnonBuck Feb 2, 2026
3f6f282
Move `DistributedCacheEventStreamStore` to `ModelContextProtocol`
MackinnonBuck Feb 2, 2026
4b241af
PR feedback: make `isCompleted` a parameter
MackinnonBuck Feb 3, 2026
2c89e60
Update src/ModelContextProtocol/Server/DistributedCacheEventStreamSto…
MackinnonBuck Feb 3, 2026
4b05763
Update src/ModelContextProtocol/Server/DistributedCacheEventStreamSto…
MackinnonBuck Feb 3, 2026
e1f0c46
PR feedback: base64-encode stream metadata cache key
MackinnonBuck Feb 3, 2026
55d9ac0
PR feedback: E2E tests
MackinnonBuck Feb 3, 2026
3547f03
`PollingInterval` -> `StreamReaderPollingInterval`
MackinnonBuck Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
<ItemGroup>
<PackageVersion Include="Microsoft.Extensions.AI" Version="$(MicrosoftExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.AI.Abstractions" Version="$(MicrosoftExtensionsVersion)" />
<PackageVersion Include="Microsoft.Extensions.Caching.Abstractions" Version="$(System10Version)" />
<PackageVersion Include="Microsoft.Extensions.Caching.Memory" Version="$(System10Version)" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(System10Version)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(System10Version)" />
</ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/ModelContextProtocol/ModelContextProtocol.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
</ItemGroup>

Expand Down
117 changes: 117 additions & 0 deletions src/ModelContextProtocol/Server/DistributedCacheEventIdFormatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

// This is a shared source file included in both ModelContextProtocol and the test project.
// Do not reference symbols internal to the core project, as they won't be available in tests.
#if NET
using System.Buffers;
using System.Buffers.Text;
using System.Diagnostics.CodeAnalysis;

#endif
using System.Text;

namespace ModelContextProtocol.Server;

/// <summary>
/// Provides methods for formatting and parsing event IDs used by <see cref="DistributedCacheEventStreamStore"/>.
/// </summary>
/// <remarks>
/// Event IDs are formatted as "{base64(sessionId)}:{base64(streamId)}:{sequence}".
/// </remarks>
internal static class DistributedCacheEventIdFormatter
{
private const char Separator = ':';

/// <summary>
/// Formats session ID, stream ID, and sequence number into an event ID string.
/// </summary>
public static string Format(string sessionId, string streamId, long sequence)
{
// Base64-encode session and stream IDs so the event ID can be parsed
// even if the original IDs contain the ':' separator character
var sessionBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(sessionId));
var streamBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(streamId));
return $"{sessionBase64}{Separator}{streamBase64}{Separator}{sequence}";
}

/// <summary>
/// Attempts to parse an event ID into its component parts.
/// </summary>
public static bool TryParse(string eventId, out string sessionId, out string streamId, out long sequence)
{
sessionId = string.Empty;
streamId = string.Empty;
sequence = 0;

#if NET
ReadOnlySpan<char> eventIdSpan = eventId.AsSpan();
Span<Range> partRanges = stackalloc Range[4];
int rangeCount = eventIdSpan.Split(partRanges, Separator);
if (rangeCount != 3)
{
return false;
}

try
{
ReadOnlySpan<char> sessionBase64 = eventIdSpan[partRanges[0]];
ReadOnlySpan<char> streamBase64 = eventIdSpan[partRanges[1]];
ReadOnlySpan<char> sequenceSpan = eventIdSpan[partRanges[2]];

if (!TryDecodeBase64ToString(sessionBase64, out sessionId!) ||
!TryDecodeBase64ToString(streamBase64, out streamId!))
{
return false;
}

return long.TryParse(sequenceSpan, out sequence);
}
catch
{
return false;
}
#else
var parts = eventId.Split(Separator);
if (parts.Length != 3)
{
return false;
}

try
{
sessionId = Encoding.UTF8.GetString(Convert.FromBase64String(parts[0]));
streamId = Encoding.UTF8.GetString(Convert.FromBase64String(parts[1]));
return long.TryParse(parts[2], out sequence);
}
catch
{
return false;
}
#endif
}

#if NET
private static bool TryDecodeBase64ToString(ReadOnlySpan<char> base64Chars, [NotNullWhen(true)] out string? result)
{
// Use a single buffer: base64 chars are ASCII (1:1 with UTF8 bytes),
// and decoded data is always smaller than encoded, so we can decode in-place.
int bufferLength = base64Chars.Length;
Span<byte> buffer = bufferLength <= 256
? stackalloc byte[bufferLength]
: new byte[bufferLength];

Encoding.UTF8.GetBytes(base64Chars, buffer);

OperationStatus status = Base64.DecodeFromUtf8InPlace(buffer, out int bytesWritten);
if (status != OperationStatus.Done)
{
result = null;
return false;
}

result = Encoding.UTF8.GetString(buffer[..bytesWritten]);
return true;
}
#endif
}
Loading