Skip to content

Commit 3f45faf

Browse files
committed
Introduce retry handling
Signed-off-by: Daniel Garnier-Moiroux <git@garnier.wf>
1 parent fa1861e commit 3f45faf

File tree

6 files changed

+221
-180
lines changed

6 files changed

+221
-180
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import reactor.core.publisher.Mono;
5252
import reactor.util.function.Tuple2;
5353
import reactor.util.function.Tuples;
54+
import reactor.util.retry.Retry;
5455

5556
/**
5657
* An implementation of the Streamable HTTP protocol as defined by the
@@ -291,26 +292,17 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
291292
})).flatMap(responseEvent -> {
292293
int statusCode = responseEvent.responseInfo().statusCode();
293294
if (statusCode == 401 || statusCode == 403) {
294-
logger.debug("Authorization error in sendMessage with code {}", statusCode);
295-
return Mono.deferContextual(innerCtx -> {
296-
var transportContext = innerCtx.getOrDefault(McpTransportContext.KEY,
297-
McpTransportContext.EMPTY);
298-
return Mono.from(this.authorizationErrorHandler.onAuthorizationError(
299-
responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
300-
logger.debug("Authorization error handled, retrying original request");
301-
return this.reconnect(stream).then();
302-
}),
303-
Mono.error(new McpHttpClientTransportException(
304-
"Authorization error connecting to SSE stream",
305-
responseEvent.responseInfo()))))
306-
.then(Mono.empty());
307-
});
295+
logger.debug("Authorization error in reconnect with code {}", statusCode);
296+
return Mono.<McpSchema.JSONRPCMessage>error(
297+
new McpHttpClientTransportAuthorizationException(
298+
"Authorization error connecting to SSE stream",
299+
responseEvent.responseInfo()));
308300
}
309301

310302
if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
311-
return Flux.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportException(
312-
"Unrecognized server error when connecting to SSE stream",
313-
responseEvent.responseInfo()));
303+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
304+
"Unrecognized server error when connecting to SSE stream, status code: "
305+
+ statusCode));
314306
}
315307
else if (statusCode >= 200 && statusCode < 300) {
316308
if (MESSAGE_EVENT_TYPE.equals(sseResponseEvent.sseEvent().event())) {
@@ -389,6 +381,7 @@ else if (statusCode == BAD_REQUEST) {
389381
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
390382
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
391383
})
384+
.retryWhen(authorizationErrorRetrySpec())
392385
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
393386
.onErrorMap(CompletionException.class, t -> t.getCause())
394387
.onErrorComplete(t -> {
@@ -411,6 +404,25 @@ else if (statusCode == BAD_REQUEST) {
411404

412405
}
413406

407+
private Retry authorizationErrorRetrySpec() {
408+
return Retry.from(companion -> companion.flatMap(retrySignal -> {
409+
if (!(retrySignal.failure() instanceof McpHttpClientTransportAuthorizationException authException)) {
410+
return Mono.error(retrySignal.failure());
411+
}
412+
if (retrySignal.totalRetriesInARow() >= this.authorizationErrorHandler.maxRetries()) {
413+
return Mono.error(retrySignal.failure());
414+
}
415+
return Mono.deferContextual(ctx -> {
416+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
417+
return Mono
418+
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
419+
.switchIfEmpty(Mono.just(false))
420+
.flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
421+
: Mono.error(retrySignal.failure()));
422+
});
423+
}));
424+
}
425+
414426
private BodyHandler<Void> toSendMessageBodySubscriber(FluxSink<ResponseEvent> sink) {
415427

416428
BodyHandler<Void> responseBodyHandler = responseInfo -> {
@@ -492,17 +504,8 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
492504
int statusCode = responseEvent.responseInfo().statusCode();
493505
if (statusCode == 401 || statusCode == 403) {
494506
logger.debug("Authorization error in sendMessage with code {}", statusCode);
495-
return Mono.deferContextual(ctx -> {
496-
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
497-
return Mono.from(this.authorizationErrorHandler
498-
.onAuthorizationError(responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
499-
logger.debug("Authorization error handled, retrying original request");
500-
return this.sendMessage(sentMessage);
501-
}), Mono.error(new McpHttpClientTransportException(
502-
"Authorization error when sending message", responseEvent.responseInfo()))))
503-
.doOnSuccess(s -> deliveredSink.success())
504-
.then(Mono.empty());
505-
});
507+
return Mono.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportAuthorizationException(
508+
"Authorization error when sending message", responseEvent.responseInfo()));
506509
}
507510

508511
if (transportSession.markInitialized(
@@ -630,6 +633,7 @@ else if (statusCode == BAD_REQUEST) {
630633
return Flux.<McpSchema.JSONRPCMessage>error(
631634
new RuntimeException("Failed to send message: " + responseEvent));
632635
})
636+
.retryWhen(authorizationErrorRetrySpec())
633637
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
634638
.onErrorMap(CompletionException.class, t -> t.getCause())
635639
.onErrorComplete(t -> {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.http.HttpResponse;
8+
9+
import io.modelcontextprotocol.spec.McpTransportException;
10+
11+
/**
12+
* Thrown when the MCP server responds with an authorization error (HTTP 401 or HTTP 403).
13+
* Subclass of {@link McpTransportException} for targeted retry handling in
14+
* {@link HttpClientStreamableHttpTransport}.
15+
*
16+
* @author Daniel Garnier-Moiroux
17+
*/
18+
public class McpHttpClientTransportAuthorizationException extends McpTransportException {
19+
20+
private final HttpResponse.ResponseInfo responseInfo;
21+
22+
public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
23+
super(message);
24+
this.responseInfo = responseInfo;
25+
}
26+
27+
public HttpResponse.ResponseInfo getResponseInfo() {
28+
return responseInfo;
29+
}
30+
31+
}

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportException.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import java.net.http.HttpResponse;
88

9-
import io.modelcontextprotocol.client.transport.McpHttpClientTransportException;
9+
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
1010
import io.modelcontextprotocol.common.McpTransportContext;
1111
import org.reactivestreams.Publisher;
1212
import reactor.core.publisher.Mono;
@@ -27,13 +27,13 @@ public interface McpHttpClientAuthorizationErrorHandler {
2727
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
2828
* should be retried or not. If the publisher returns true, the original transport
2929
* method (connect, sendMessage) will be replayed with the original arguments.
30-
* Otherwise, the transport will throw an {@link McpHttpClientTransportException},
31-
* indicating the error status.
30+
* Otherwise, the transport will throw an
31+
* {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
3232
* <p>
3333
* If the returned {@link Publisher} errors, the error will be propagated to the
3434
* calling method, to be handled by the caller.
3535
* <p>
36-
* The caller is responsible for bounding the number of retries.
36+
* The number of retries is bounded by {@link #maxRetries()}.
3737
* @param responseInfo the HTTP response information
3838
* @param context the MCP client transport context
3939
* @return {@link Publisher} emitting true if the original request should be replayed,
@@ -42,36 +42,23 @@ public interface McpHttpClientAuthorizationErrorHandler {
4242
Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);
4343

4444
/**
45-
* A no-op handler, used in the default use-case.
45+
* Maximum number of authorization error retries the transport will attempt. When the
46+
* handler signals a retry via {@link #handle}, the transport will replay the original
47+
* request at most this many times. If the authorization error persists after
48+
* exhausting all retries, the transport will propagate the
49+
* {@link McpHttpClientTransportAuthorizationException}.
50+
* <p>
51+
* Defaults to {@code 1}.
52+
* @return the maximum number of retries
4653
*/
47-
McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
54+
default int maxRetries() {
55+
return 1;
56+
}
4857

4958
/**
50-
* Handle authorization error (HTTP 401 or 403), and optionally retry the HTTP
51-
* request, or trigger a transport error. To retry, use the {@code retryAction}
52-
* publisher. To emit the default transport error, use the {@code defaultError}
53-
* publisher.
54-
* <p>
55-
* Optionally, the returned {@link Publisher} may error to trigger an out-of-band
56-
* action. In that case, the error will be propagated to the calling method, to be
57-
* handled by the caller.
58-
* <p>
59-
* Defaults to {@link #handle(HttpResponse.ResponseInfo, McpTransportContext)}, and
60-
* uses the boolean from the return value to decide whether it should retry the
61-
* request.
62-
* @param responseInfo the HTTP response information
63-
* @param context the MCP client transport context
64-
* @param retryAction handler to retry the original request
65-
* @param defaultError handler to emit an error
66-
* @return a {@link Publisher} to signal either an error or a retry
59+
* A no-op handler, used in the default use-case.
6760
*/
68-
default Publisher<Void> onAuthorizationError(HttpResponse.ResponseInfo responseInfo, McpTransportContext context,
69-
Publisher<Void> retryAction, Publisher<Void> defaultError) {
70-
return Mono.from(this.handle(responseInfo, context))
71-
.switchIfEmpty(Mono.just(false))
72-
.flatMap(shouldRetry -> shouldRetry != null && shouldRetry ? Mono.from(retryAction)
73-
: Mono.from(defaultError));
74-
}
61+
McpHttpClientAuthorizationErrorHandler NOOP = new Noop();
7562

7663
/**
7764
* Create a {@link McpHttpClientAuthorizationErrorHandler} from a synchronous handler.
@@ -95,7 +82,8 @@ interface Sync {
9582
* request should be retried or not. If the return value is true, the original
9683
* transport method (connect, sendMessage) will be replayed with the original
9784
* arguments. Otherwise, the transport will throw an
98-
* {@link McpHttpClientTransportException}, indicating the error status.
85+
* {@link McpHttpClientTransportAuthorizationException}, indicating the error
86+
* status.
9987
* @param responseInfo the HTTP response information
10088
* @param context the MCP client transport context
10189
* @return true if the original request should be replayed, false otherwise.

mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java

Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
import java.net.http.HttpResponse;
77

88
import io.modelcontextprotocol.common.McpTransportContext;
9-
import org.junit.jupiter.api.Nested;
109
import org.junit.jupiter.api.Test;
11-
import reactor.core.publisher.Mono;
1210
import reactor.test.StepVerifier;
1311

1412
import static org.mockito.Mockito.mock;
@@ -22,85 +20,29 @@ class McpHttpClientAuthorizationErrorHandlerTest {
2220

2321
private final McpTransportContext context = McpTransportContext.EMPTY;
2422

25-
@Nested
26-
class OnAuthorizationError {
27-
28-
@Test
29-
void whenTrueThenRetry() {
30-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono.just(true);
31-
Mono<Void> retryAction = Mono.empty();
32-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
33-
34-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
35-
.verifyComplete();
36-
}
37-
38-
@Test
39-
void whenFalseThenError() {
40-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono.just(false);
41-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
42-
Mono<Void> defaultError = Mono.error(new RuntimeException("authorization error"));
43-
44-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
45-
.expectErrorMatches(t -> t instanceof RuntimeException && t.getMessage().equals("authorization error"))
46-
.verify();
47-
}
48-
49-
@Test
50-
void whenErrorThenPropagate() {
51-
McpHttpClientAuthorizationErrorHandler handler = (info, ctx) -> Mono
52-
.error(new IllegalStateException("handler error"));
53-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
54-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
55-
56-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
57-
.expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("handler error"))
58-
.verify();
59-
}
60-
23+
@Test
24+
void whenTrueThenRetry() {
25+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
26+
.fromSync((info, ctx) -> true);
27+
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(true).verifyComplete();
6128
}
6229

63-
@Nested
64-
class FromSync {
65-
66-
@Test
67-
void whenTrueThenRetry() {
68-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
69-
.fromSync((info, ctx) -> true);
70-
Mono<Void> retryAction = Mono.empty();
71-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
72-
73-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
74-
.verifyComplete();
75-
}
76-
77-
@Test
78-
void whenFalseThenError() {
79-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
80-
.fromSync((info, ctx) -> false);
81-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
82-
Mono<Void> defaultError = Mono.error(new RuntimeException("authorization error"));
83-
84-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
85-
.expectErrorMatches(t -> t instanceof RuntimeException && t.getMessage().equals("authorization error"))
86-
.verify();
87-
}
88-
89-
@Test
90-
void whenExceptionThenPropagate() {
91-
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
92-
.fromSync((info, ctx) -> {
93-
throw new IllegalStateException("sync handler error");
94-
});
95-
Mono<Void> retryAction = Mono.error(new RuntimeException("should not be called"));
96-
Mono<Void> defaultError = Mono.error(new RuntimeException("should not be called"));
97-
98-
StepVerifier.create(handler.onAuthorizationError(responseInfo, context, retryAction, defaultError))
99-
.expectErrorMatches(
100-
t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
101-
.verify();
102-
}
30+
@Test
31+
void whenFalseThenError() {
32+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
33+
.fromSync((info, ctx) -> false);
34+
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(false).verifyComplete();
35+
}
10336

37+
@Test
38+
void whenExceptionThenPropagate() {
39+
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
40+
.fromSync((info, ctx) -> {
41+
throw new IllegalStateException("sync handler error");
42+
});
43+
StepVerifier.create(handler.handle(responseInfo, context))
44+
.expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
45+
.verify();
10446
}
10547

10648
}

0 commit comments

Comments
 (0)