Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions packages/client/src/client/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,22 @@ export class StdioClientTransport implements Transport {
}

send(message: JSONRPCMessage): Promise<void> {
return new Promise(resolve => {
if (!this._process?.stdin) {
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
}
return new Promise((resolve, reject) => {
try {
if (!this._process?.stdin) {
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
}

const json = serializeMessage(message);
if (this._process.stdin.write(json)) {
resolve();
} else {
this._process.stdin.once('drain', resolve);
const json = serializeMessage(message);
if (this._process.stdin.write(json)) {
resolve();
} else {
this._process.stdin.once('drain', resolve);
}
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
reject(err);
}
});
}
Expand Down
20 changes: 14 additions & 6 deletions packages/client/src/client/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ export class WebSocketClientTransport implements Transport {

send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (!this._socket) {
reject(new Error('Not connected'));
return;
}
try {
if (!this._socket) {
const err = new Error('Not connected');
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
reject(err);
return;
}

this._socket?.send(JSON.stringify(message));
resolve();
this._socket.send(JSON.stringify(message));
resolve();
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
reject(err);
}
});
}
}
18 changes: 12 additions & 6 deletions packages/server/src/server/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ export class StdioServerTransport implements Transport {
}

send(message: JSONRPCMessage): Promise<void> {
return new Promise(resolve => {
const json = serializeMessage(message);
if (this._stdout.write(json)) {
resolve();
} else {
this._stdout.once('drain', resolve);
return new Promise((resolve, reject) => {
try {
const json = serializeMessage(message);
if (this._stdout.write(json)) {
resolve();
} else {
this._stdout.once('drain', resolve);
}
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
reject(err);
}
});
}
Expand Down
68 changes: 55 additions & 13 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
private _enableDnsRebindingProtection: boolean;
private _retryInterval?: number;
private _supportedProtocolVersions: string[];
private _isClosing = false;

sessionId?: string;
onclose?: () => void;
Expand Down Expand Up @@ -388,7 +389,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
return;
}

const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
let primingEventId: string;
try {
primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
} catch (error) {
const err = error as Error;
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}

let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
if (this._retryInterval !== undefined) {
Expand Down Expand Up @@ -887,14 +895,30 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
}

async close(): Promise<void> {
// Close all SSE connections
for (const { cleanup } of this._streamMapping.values()) {
cleanup();
if (this._isClosing) return;
this._isClosing = true;

try {
// Snapshot and clear the mapping before calling cleanup functions
// to prevent re-entrant deletes from cancel callbacks
const entries = [...this._streamMapping.values()];
this._streamMapping.clear();

// Close all SSE connections
for (const { cleanup } of entries) {
try {
cleanup();
} catch {
// Suppress cleanup errors
}
}

// Clear any pending responses
this._requestResponseMap.clear();
} finally {
// Don't reset _isClosing - once closed, stay closed
}
this._streamMapping.clear();

// Clear any pending responses
this._requestResponseMap.clear();
this.onclose?.();
}

Expand Down Expand Up @@ -937,15 +961,23 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
if (requestId === undefined) {
// For standalone SSE streams, we can only send requests and notifications
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
const err = new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}

// Generate and store event ID if event store is provided
// Store even if stream is disconnected so events can be replayed on reconnect
let eventId: string | undefined;
if (this._eventStore) {
// Stores the event and gets the generated event ID
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
try {
// Stores the event and gets the generated event ID
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
} catch (error) {
const err = error as Error;
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}
}

const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
Expand All @@ -964,7 +996,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
// Get the response for this request
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
const err = new Error(`No connection established for request ID: ${String(requestId)}`);
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}

const stream = this._streamMapping.get(streamId);
Expand All @@ -974,7 +1008,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
let eventId: string | undefined;

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
try {
eventId = await this._eventStore.storeEvent(streamId, message);
} catch (error) {
const err = error as Error;
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}
}
// Write the event to the response stream
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
Expand All @@ -989,7 +1029,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

if (allResponsesReady) {
if (!stream) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
const err = new Error(`No connection established for request ID: ${String(requestId)}`);
try { this.onerror?.(err); } catch { /* handler error should not mask transport error */ }
throw err;
}
if (this._enableJsonResponse && stream.resolveJson) {
// All responses ready, send as JSON
Expand Down
Loading