Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/**
* The Model Context Protocol (MCP) client implementation that provides asynchronous
Expand Down Expand Up @@ -181,7 +182,7 @@ public class McpAsyncClient {
* @param features the MCP Client supported features. responses against output
* schemas.
*/
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
public McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
JsonSchemaValidator jsonSchemaValidator, McpClientFeatures.Async features) {

Assert.notNull(transport, "Transport must not be null");
Expand Down Expand Up @@ -317,13 +318,20 @@ public class McpAsyncClient {
};

this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, transport.protocolVersions(),
initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers,
notificationHandlers, con -> con.contextWrite(ctx)),
initializationTimeout,
ctx -> buildClientSession(requestTimeout, transport, requestHandlers, notificationHandlers, ctx),
postInitializationHook);

this.transport.setExceptionHandler(this.initializer::handleException);
}

protected McpClientSession buildClientSession(Duration requestTimeout, McpClientTransport transport,
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers,
ContextView ctx) {
return new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers,
con -> con.contextWrite(ctx));
}

/**
* Get the current initialization result.
* @return the initialization result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @see McpSchema.Implementation
* @see McpSchema.ClientCapabilities
*/
class McpClientFeatures {
public class McpClientFeatures {

/**
* Asynchronous client features specification providing the capabilities and request
Expand All @@ -64,7 +64,7 @@ class McpClientFeatures {
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
public record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class McpSyncClient implements AutoCloseable {
// is not a requirement?
private static final long DEFAULT_CLOSE_TIMEOUT_MS = 10_000L;

private final McpAsyncClient delegate;
protected final McpAsyncClient delegate;

private final Supplier<McpTransportContext> contextProvider;

Expand All @@ -75,7 +75,7 @@ public class McpSyncClient implements AutoCloseable {
* @param contextProvider the supplier of context before calling any non-blocking
* operation on underlying delegate
*/
McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
public McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
Assert.notNull(delegate, "The delegate can not be null");
Assert.notNull(contextProvider, "The contextProvider can not be null");
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,35 @@ public class McpAsyncServer {

private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);

private final McpServerTransportProviderBase mcpTransportProvider;
protected final McpServerTransportProviderBase mcpTransportProvider;

private final McpJsonMapper jsonMapper;
protected final McpJsonMapper jsonMapper;

private final JsonSchemaValidator jsonSchemaValidator;
protected final JsonSchemaValidator jsonSchemaValidator;

private final McpSchema.ServerCapabilities serverCapabilities;
protected final McpSchema.ServerCapabilities serverCapabilities;

private final McpSchema.Implementation serverInfo;
protected final McpSchema.Implementation serverInfo;

private final String instructions;
protected final String instructions;

private final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
protected final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();

// FIXME: this field is deprecated and should be remvoed together with the
// broadcasting loggingNotification.
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;

private final ConcurrentHashMap<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new ConcurrentHashMap<>();

private List<String> protocolVersions;
protected List<String> protocolVersions;

private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();
protected McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();

/**
* Create a new McpAsyncServer with the given transport provider and capabilities.
Expand All @@ -128,7 +128,7 @@ public class McpAsyncServer {
* @param features The MCP server supported features.
* @param jsonMapper The JsonMapper to use for JSON serialization/deserialization
*/
McpAsyncServer(McpServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
public McpAsyncServer(McpServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
Expand All @@ -153,7 +153,7 @@ public class McpAsyncServer {
requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}

McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
public McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
Expand All @@ -178,7 +178,7 @@ public class McpAsyncServer {
this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}

private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
protected Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
Map<String, McpNotificationHandler> notificationHandlers = new HashMap<>();

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
Expand All @@ -196,7 +196,7 @@ private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServe
return notificationHandlers;
}

private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
protected Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
Map<String, McpRequestHandler<?>> requestHandlers = new HashMap<>();

// Initialize request handlers for standard MCP methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class McpServerFeatures {
* roots list changes
* @param instructions The server instructions text
*/
record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
Map<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates,
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
Expand All @@ -59,7 +59,7 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
* the roots list changes
* @param instructions The server instructions text
*/
Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.AsyncToolSpecification> tools, Map<String, AsyncResourceSpecification> resources,
Map<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates,
Map<String, McpServerFeatures.AsyncPromptSpecification> prompts,
Expand Down Expand Up @@ -101,7 +101,7 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
* @return a specification which is protected from blocking calls specified by the
* user.
*/
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
public static Async fromSync(Sync syncSpec, boolean immediateExecution) {
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
for (var tool : syncSpec.tools()) {
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
Expand Down Expand Up @@ -153,7 +153,7 @@ static Async fromSync(Sync syncSpec, boolean immediateExecution) {
* roots list changes
* @param instructions The server instructions text
*/
record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.SyncToolSpecification> tools,
Map<String, McpServerFeatures.SyncResourceSpecification> resources,
Map<String, McpServerFeatures.SyncResourceTemplateSpecification> resourceTemplates,
Expand All @@ -173,7 +173,7 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
* the roots list changes
* @param instructions The server instructions text
*/
Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
public Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities serverCapabilities,
List<McpServerFeatures.SyncToolSpecification> tools,
Map<String, McpServerFeatures.SyncResourceSpecification> resources,
Map<String, McpServerFeatures.SyncResourceTemplateSpecification> resourceTemplates,
Expand Down Expand Up @@ -354,7 +354,8 @@ public static Builder builder() {
public record AsyncResourceSpecification(McpSchema.Resource resource,
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {

static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
public static AsyncResourceSpecification fromSync(SyncResourceSpecification resource,
boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (resource == null) {
return null;
Expand Down Expand Up @@ -394,7 +395,7 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, b
public record AsyncResourceTemplateSpecification(McpSchema.ResourceTemplate resourceTemplate,
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {

static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecification resource,
public static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecification resource,
boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (resource == null) {
Expand Down Expand Up @@ -442,7 +443,7 @@ static AsyncResourceTemplateSpecification fromSync(SyncResourceTemplateSpecifica
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {

static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
public static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
// FIXME: This is temporary, proper validation should be implemented
if (prompt == null) {
return null;
Expand Down Expand Up @@ -482,7 +483,7 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
* @return an asynchronous wrapper of the provided sync specification, or
* {@code null} if input is null
*/
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
public static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
boolean immediateExecution) {
if (completion == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public class McpStatelessAsyncServer {

private static final Logger logger = LoggerFactory.getLogger(McpStatelessAsyncServer.class);

private final McpStatelessServerTransport mcpTransportProvider;
protected final McpStatelessServerTransport mcpTransportProvider;

private final McpJsonMapper jsonMapper;
protected final McpJsonMapper jsonMapper;

private final McpSchema.ServerCapabilities serverCapabilities;

Expand All @@ -77,7 +77,7 @@ public class McpStatelessAsyncServer {

private final JsonSchemaValidator jsonSchemaValidator;

McpStatelessAsyncServer(McpStatelessServerTransport mcpTransport, McpJsonMapper jsonMapper,
public McpStatelessAsyncServer(McpStatelessServerTransport mcpTransport, McpJsonMapper jsonMapper,
McpStatelessServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransport;
Expand Down
Loading