diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts index ae2bf6da4..a37f3f57d 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -167,7 +167,7 @@ describe('ResourceLimitError', () => { stack: 'stack trace', } as unknown as MarshaledOcapError, expectedError: - 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal | literal | literal`, but received: "invalid"', }, { name: 'invalid current type', diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index 13026a7c6..d5cc5f39f 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -29,7 +29,11 @@ export class ResourceLimitError extends BaseError { message: string, options?: ErrorOptionsWithStack & { data?: { - limitType?: 'connection' | 'messageSize'; + limitType?: + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; current?: number; limit?: number; }; @@ -50,7 +54,12 @@ export class ResourceLimitError extends BaseError { data: optional( object({ limitType: optional( - union([literal('connection'), literal('messageSize')]), + union([ + literal('connection'), + literal('messageSize'), + literal('messageRate'), + literal('connectionRate'), + ]), ), current: optional(number()), limit: optional(number()), @@ -75,7 +84,11 @@ export class ResourceLimitError extends BaseError { const options = unmarshalErrorOptions(marshaledError); const data = marshaledError.data as | { - limitType?: 'connection' | 'messageSize'; + limitType?: + | 'connection' + | 'messageSize' + | 'messageRate' + | 'connectionRate'; current?: number; limit?: number; } diff --git a/packages/nodejs/test/e2e/remote-comms.test.ts b/packages/nodejs/test/e2e/remote-comms.test.ts index 971eb6f00..0839104d3 100644 --- a/packages/nodejs/test/e2e/remote-comms.test.ts +++ b/packages/nodejs/test/e2e/remote-comms.test.ts @@ -505,12 +505,14 @@ describe.sequential('Remote Communications E2E', () => { it( 'rejects new messages when queue reaches MAX_QUEUE limit', async () => { + // Use high rate limit to avoid rate limiting interference with queue limit test const { aliceRef, bobURL } = await setupAliceAndBob( kernel1, kernel2, kernelStore1, kernelStore2, testRelays, + { maxMessagesPerSecond: 500 }, ); await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']); diff --git a/packages/nodejs/test/helpers/remote-comms.ts b/packages/nodejs/test/helpers/remote-comms.ts index bcd7b80f4..303a31726 100644 --- a/packages/nodejs/test/helpers/remote-comms.ts +++ b/packages/nodejs/test/helpers/remote-comms.ts @@ -1,7 +1,11 @@ import type { KernelDatabase } from '@metamask/kernel-store'; import { stringify } from '@metamask/kernel-utils'; import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel'; -import type { ClusterConfig, KRef } from '@metamask/ocap-kernel'; +import type { + ClusterConfig, + KRef, + RemoteCommsOptions, +} from '@metamask/ocap-kernel'; import { makeTestKernel } from './kernel.ts'; @@ -181,6 +185,7 @@ export async function wait(ms: number): Promise { * @param kernelStore1 - Kernel store for first kernel. * @param kernelStore2 - Kernel store for second kernel. * @param relays - Array of relay addresses. + * @param remoteCommsOptions - Optional additional options for initRemoteComms. * @returns Object with all setup data including URLs and references. */ export async function setupAliceAndBob( @@ -189,6 +194,7 @@ export async function setupAliceAndBob( kernelStore1: ReturnType, kernelStore2: ReturnType, relays: string[], + remoteCommsOptions?: Omit, ): Promise<{ aliceURL: string; bobURL: string; @@ -197,8 +203,8 @@ export async function setupAliceAndBob( peerId1: string; peerId2: string; }> { - await kernel1.initRemoteComms({ relays }); - await kernel2.initRemoteComms({ relays }); + await kernel1.initRemoteComms({ relays, ...remoteCommsOptions }); + await kernel2.initRemoteComms({ relays, ...remoteCommsOptions }); const aliceConfig = makeRemoteVatConfig('Alice'); const bobConfig = makeRemoteVatConfig('Bob'); diff --git a/packages/ocap-kernel/src/remotes/platform/constants.ts b/packages/ocap-kernel/src/remotes/platform/constants.ts index 608a3ebe7..ca66968fc 100644 --- a/packages/ocap-kernel/src/remotes/platform/constants.ts +++ b/packages/ocap-kernel/src/remotes/platform/constants.ts @@ -15,3 +15,15 @@ export const DEFAULT_WRITE_TIMEOUT_MS = 10_000; /** SCTP user initiated abort code (RFC 4960) */ export const SCTP_USER_INITIATED_ABORT = 12; + +/** Default message rate limit: 100 messages per second per peer */ +export const DEFAULT_MESSAGE_RATE_LIMIT = 100; + +/** Default message rate window in milliseconds (1 second) */ +export const DEFAULT_MESSAGE_RATE_WINDOW_MS = 1000; + +/** Default connection attempt rate limit: 10 attempts per minute per peer */ +export const DEFAULT_CONNECTION_RATE_LIMIT = 10; + +/** Default connection rate window in milliseconds (1 minute) */ +export const DEFAULT_CONNECTION_RATE_WINDOW_MS = 60_000; diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts new file mode 100644 index 000000000..57f443113 --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.test.ts @@ -0,0 +1,294 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; +import { describe, it, expect, beforeEach } from 'vitest'; + +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; +import { + SlidingWindowRateLimiter, + makeMessageRateLimiter, + makeConnectionRateLimiter, +} from './rate-limiter.ts'; + +describe('SlidingWindowRateLimiter', () => { + let limiter: SlidingWindowRateLimiter; + + beforeEach(() => { + // Create a limiter allowing 3 events per 100ms window for faster tests + limiter = new SlidingWindowRateLimiter(3, 100); + }); + + describe('wouldExceedLimit', () => { + it('returns false when no events recorded', () => { + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns false when under the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + + it('returns true when at the limit', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('tracks limits independently per key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + expect(limiter.wouldExceedLimit('peer2')).toBe(false); + }); + + it('allows events after window expires', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for window to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); + }); + + describe('recordEvent', () => { + it('records events for a key', () => { + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('accumulates events', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('prunes old events when recording', async () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('checkAndRecord', () => { + it('records event when under limit', () => { + limiter.checkAndRecord('peer1', 'messageRate'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('throws ResourceLimitError when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + expect(() => limiter.checkAndRecord('peer1', 'messageRate')).toThrow( + ResourceLimitError, + ); + }); + + it('includes limit details in error', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + let caughtError: ResourceLimitError | undefined; + try { + limiter.checkAndRecord('peer1', 'messageRate'); + } catch (error) { + caughtError = error as ResourceLimitError; + } + + expect(caughtError).toBeInstanceOf(ResourceLimitError); + expect(caughtError?.data).toStrictEqual({ + limitType: 'messageRate', + current: 3, + limit: 3, + }); + }); + + it('does not record when limit exceeded', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + + try { + limiter.checkAndRecord('peer1', 'messageRate'); + } catch { + // Expected + } + + expect(limiter.getCurrentCount('peer1')).toBe(3); + }); + }); + + describe('getCurrentCount', () => { + it('returns 0 for unknown key', () => { + expect(limiter.getCurrentCount('unknown')).toBe(0); + }); + + it('returns count of events within window', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(2); + }); + + it('excludes events outside window', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + // First event is now outside window (110ms ago) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('clearKey', () => { + it('removes all events for a key', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('does not affect other keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clearKey('peer1'); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(1); + }); + }); + + describe('clear', () => { + it('removes all events for all keys', () => { + limiter.recordEvent('peer1'); + limiter.recordEvent('peer2'); + limiter.clear(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + expect(limiter.getCurrentCount('peer2')).toBe(0); + }); + }); + + describe('pruneStale', () => { + it('removes keys with no recent events', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 110)); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(0); + }); + + it('keeps keys with recent events', () => { + limiter.recordEvent('peer1'); + limiter.pruneStale(); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + + it('prunes old events from active keys', async () => { + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 60)); + limiter.recordEvent('peer1'); + await new Promise((resolve) => setTimeout(resolve, 50)); + limiter.pruneStale(); + // Only the second event should remain (first is >100ms old) + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); + + describe('sliding window behavior', () => { + it('allows burst followed by sustained rate', async () => { + // Burst 3 events + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); + limiter.checkAndRecord('peer1', 'messageRate'); + + // Should be at limit + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Wait for first event to expire + await new Promise((resolve) => setTimeout(resolve, 110)); + + // Now slots available + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + limiter.checkAndRecord('peer1', 'messageRate'); + expect(limiter.getCurrentCount('peer1')).toBe(1); + }); + }); +}); + +describe('makeMessageRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeMessageRateLimiter(); + + // Should allow DEFAULT_MESSAGE_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_MESSAGE_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeMessageRateLimiter(5); + + for (let idx = 0; idx < 5; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('uses 1 second window', async () => { + // Use a small limit to make test faster + const limiter = makeMessageRateLimiter(2); + + limiter.recordEvent('peer1'); + limiter.recordEvent('peer1'); + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + + // Window is 1 second, so after 1 second events should be allowed + await new Promise((resolve) => + setTimeout(resolve, DEFAULT_MESSAGE_RATE_WINDOW_MS + 10), + ); + expect(limiter.wouldExceedLimit('peer1')).toBe(false); + }); +}); + +describe('makeConnectionRateLimiter', () => { + it('creates limiter with default settings', () => { + const limiter = makeConnectionRateLimiter(); + + // Should allow DEFAULT_CONNECTION_RATE_LIMIT events + for (let idx = 0; idx < DEFAULT_CONNECTION_RATE_LIMIT; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + it('creates limiter with custom rate', () => { + const limiter = makeConnectionRateLimiter(3); + + for (let idx = 0; idx < 3; idx++) { + limiter.recordEvent('peer1'); + } + expect(limiter.wouldExceedLimit('peer1')).toBe(true); + }); + + // Skip window expiration test for connection limiter as it would take 60 seconds +}); + +describe('constants', () => { + it('exports expected default values', () => { + expect(DEFAULT_MESSAGE_RATE_LIMIT).toBe(100); + expect(DEFAULT_MESSAGE_RATE_WINDOW_MS).toBe(1000); + expect(DEFAULT_CONNECTION_RATE_LIMIT).toBe(10); + expect(DEFAULT_CONNECTION_RATE_WINDOW_MS).toBe(60_000); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts new file mode 100644 index 000000000..b3e79a19c --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/rate-limiter.ts @@ -0,0 +1,188 @@ +import { ResourceLimitError } from '@metamask/kernel-errors'; + +import { + DEFAULT_CONNECTION_RATE_LIMIT, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + DEFAULT_MESSAGE_RATE_LIMIT, + DEFAULT_MESSAGE_RATE_WINDOW_MS, +} from './constants.ts'; + +/** + * A sliding window rate limiter that tracks event counts per key within a time window. + * Events older than the window are automatically pruned when checking or recording. + */ +export class SlidingWindowRateLimiter { + readonly #maxEvents: number; + + readonly #windowMs: number; + + readonly #timestamps: Map; + + /** + * Create a new sliding window rate limiter. + * + * @param maxEvents - Maximum number of events allowed within the window. + * @param windowMs - Window size in milliseconds. + */ + constructor(maxEvents: number, windowMs: number) { + this.#maxEvents = maxEvents; + this.#windowMs = windowMs; + this.#timestamps = new Map(); + } + + /** + * Check if an event would exceed the rate limit for a given key. + * This does not record the event. + * + * @param key - The key to check (e.g., peer ID). + * @returns True if the event would exceed the rate limit. + */ + wouldExceedLimit(key: string): boolean { + const now = Date.now(); + const cutoff = now - this.#windowMs; + const timestamps = this.#timestamps.get(key); + + if (!timestamps) { + return false; + } + + // Count events within the window + const recentCount = timestamps.filter((ts) => ts > cutoff).length; + return recentCount >= this.#maxEvents; + } + + /** + * Record an event for a given key. + * Automatically prunes old timestamps outside the window. + * + * @param key - The key to record (e.g., peer ID). + */ + recordEvent(key: string): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + let timestamps = this.#timestamps.get(key); + + if (!timestamps) { + timestamps = []; + this.#timestamps.set(key, timestamps); + } + + // Prune old timestamps and add new one + const pruned = timestamps.filter((ts) => ts > cutoff); + pruned.push(now); + this.#timestamps.set(key, pruned); + } + + /** + * Check the rate limit and record the event if allowed. + * Throws ResourceLimitError if the limit would be exceeded. + * + * @param key - The key to check and record (e.g., peer ID). + * @param limitType - The type of limit for error reporting. + * @throws ResourceLimitError if the rate limit would be exceeded. + */ + checkAndRecord( + key: string, + limitType: 'messageRate' | 'connectionRate', + ): void { + if (this.wouldExceedLimit(key)) { + const timestamps = this.#timestamps.get(key) ?? []; + const cutoff = Date.now() - this.#windowMs; + const currentCount = timestamps.filter((ts) => ts > cutoff).length; + + throw new ResourceLimitError( + `Rate limit exceeded: ${currentCount}/${this.#maxEvents} ${limitType} in ${this.#windowMs}ms window`, + { + data: { + limitType, + current: currentCount, + limit: this.#maxEvents, + }, + }, + ); + } + this.recordEvent(key); + } + + /** + * Get the current count of events within the window for a key. + * + * @param key - The key to check. + * @returns The number of events within the current window. + */ + getCurrentCount(key: string): number { + const now = Date.now(); + const cutoff = now - this.#windowMs; + const timestamps = this.#timestamps.get(key); + + if (!timestamps) { + return 0; + } + + return timestamps.filter((ts) => ts > cutoff).length; + } + + /** + * Clear all recorded events for a specific key. + * + * @param key - The key to clear. + */ + clearKey(key: string): void { + this.#timestamps.delete(key); + } + + /** + * Clear all recorded events. + */ + clear(): void { + this.#timestamps.clear(); + } + + /** + * Prune old timestamps for all keys. + * Removes keys that have no recent events. + */ + pruneStale(): void { + const now = Date.now(); + const cutoff = now - this.#windowMs; + + for (const [key, timestamps] of this.#timestamps.entries()) { + const pruned = timestamps.filter((ts) => ts > cutoff); + if (pruned.length === 0) { + this.#timestamps.delete(key); + } else { + this.#timestamps.set(key, pruned); + } + } + } +} + +/** + * Factory function to create a message rate limiter. + * + * @param maxMessagesPerSecond - Maximum messages per second per peer. + * @returns A configured SlidingWindowRateLimiter for message rate limiting. + */ +export function makeMessageRateLimiter( + maxMessagesPerSecond: number = DEFAULT_MESSAGE_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxMessagesPerSecond, + DEFAULT_MESSAGE_RATE_WINDOW_MS, + ); +} + +/** + * Factory function to create a connection attempt rate limiter. + * + * @param maxAttemptsPerMinute - Maximum connection attempts per minute per peer. + * @returns A configured SlidingWindowRateLimiter for connection rate limiting. + */ +export function makeConnectionRateLimiter( + maxAttemptsPerMinute: number = DEFAULT_CONNECTION_RATE_LIMIT, +): SlidingWindowRateLimiter { + return new SlidingWindowRateLimiter( + maxAttemptsPerMinute, + DEFAULT_CONNECTION_RATE_WINDOW_MS, + ); +} diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts index 5c45729e2..732136f59 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.test.ts @@ -79,6 +79,7 @@ describe('reconnection-lifecycle', () => { dialPeer: vi.fn().mockResolvedValue(mockChannel), reuseOrReturnChannel: vi.fn().mockResolvedValue(mockChannel), checkConnectionLimit: vi.fn(), + checkConnectionRateLimit: vi.fn(), registerChannel: vi.fn(), } as unknown as ReconnectionLifecycleDeps; }); @@ -321,6 +322,48 @@ describe('reconnection-lifecycle', () => { expect(deps.checkConnectionLimit).toHaveBeenCalled(); }); + it('checks connection rate limit before dialing', async () => { + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + expect(deps.checkConnectionRateLimit).toHaveBeenCalledWith('peer1'); + }); + + it('continues loop on ResourceLimitError instead of giving up', async () => { + const { ResourceLimitError } = kernelErrors; + ( + deps.checkConnectionRateLimit as ReturnType + ).mockImplementationOnce(() => { + throw new ResourceLimitError('Rate limit exceeded', { + data: { limitType: 'connectionRate', current: 10, limit: 10 }, + }); + }); + + (deps.reconnectionManager.isReconnecting as ReturnType) + .mockReturnValueOnce(true) // First attempt - rate limited + .mockReturnValueOnce(true) // Second attempt - success + .mockReturnValueOnce(false); + + const lifecycle = makeReconnectionLifecycle(deps); + + await lifecycle.attemptReconnection('peer1'); + + // Should not call onRemoteGiveUp because rate limit is retryable + expect(deps.onRemoteGiveUp).not.toHaveBeenCalled(); + // Should have tried twice (once rate limited, once successful) + expect(deps.reconnectionManager.incrementAttempt).toHaveBeenCalledTimes( + 2, + ); + expect(deps.logger.log).toHaveBeenCalledWith( + expect.stringContaining('rate limited'), + ); + }); + it('logs reconnection attempts', async () => { (deps.reconnectionManager.isReconnecting as ReturnType) .mockReturnValueOnce(true) diff --git a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts index 5cc950f52..9a0653948 100644 --- a/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts +++ b/packages/ocap-kernel/src/remotes/platform/reconnection-lifecycle.ts @@ -1,4 +1,7 @@ -import { isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + isRetryableNetworkError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -27,6 +30,7 @@ export type ReconnectionLifecycleDeps = { dialedChannel: Channel, ) => Promise; checkConnectionLimit: () => void; + checkConnectionRateLimit: (peerId: string) => void; registerChannel: ( peerId: string, channel: Channel, @@ -62,6 +66,7 @@ export function makeReconnectionLifecycle( dialPeer, reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit, registerChannel, } = deps; @@ -124,6 +129,14 @@ export function makeReconnectionLifecycle( reconnectionManager.stopReconnection(peerId); return; } + // Rate limit errors are temporary - skip this attempt but continue the loop + // The backoff delay will naturally space out attempts + if (problem instanceof ResourceLimitError) { + logger.log( + `${peerId}:: reconnection attempt ${nextAttempt} rate limited, will retry after backoff`, + ); + continue; + } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); reconnectionManager.stopReconnection(peerId); @@ -151,6 +164,9 @@ export function makeReconnectionLifecycle( state: PeerState, peerId: string, ): Promise { + // Check connection rate limit before attempting dial + checkConnectionRateLimit(peerId); + const { locationHints: hints } = state; const dialedChannel = await dialPeer(peerId, hints); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index dfc4e8282..dd36c1ea9 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -7,13 +7,19 @@ import { makeErrorLogger, writeWithTimeout } from './channel-utils.ts'; import { ConnectionFactory } from './connection-factory.ts'; import { DEFAULT_CLEANUP_INTERVAL_MS, + DEFAULT_CONNECTION_RATE_LIMIT, DEFAULT_MAX_CONCURRENT_CONNECTIONS, DEFAULT_MAX_MESSAGE_SIZE_BYTES, + DEFAULT_MESSAGE_RATE_LIMIT, DEFAULT_STALE_PEER_TIMEOUT_MS, DEFAULT_WRITE_TIMEOUT_MS, SCTP_USER_INITIATED_ABORT, } from './constants.ts'; import { PeerStateManager } from './peer-state-manager.ts'; +import { + makeConnectionRateLimiter, + makeMessageRateLimiter, +} from './rate-limiter.ts'; import { makeReconnectionLifecycle } from './reconnection-lifecycle.ts'; import { ReconnectionManager } from './reconnection.ts'; import { @@ -41,6 +47,8 @@ import type { * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). + * @param options.maxMessagesPerSecond - Maximum messages per second per peer (default: 100). + * @param options.maxConnectionAttemptsPerMinute - Maximum connection attempts per minute per peer (default: 10). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -65,6 +73,8 @@ export async function initTransport( maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, + maxMessagesPerSecond = DEFAULT_MESSAGE_RATE_LIMIT, + maxConnectionAttemptsPerMinute = DEFAULT_CONNECTION_RATE_LIMIT, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -78,6 +88,10 @@ export async function initTransport( maxConcurrentConnections, () => peerStateManager.countActiveConnections(), ); + const messageRateLimiter = makeMessageRateLimiter(maxMessagesPerSecond); + const connectionRateLimiter = makeConnectionRateLimiter( + maxConnectionAttemptsPerMinute, + ); let cleanupIntervalId: ReturnType | undefined; // Holder for handleConnectionLoss - initialized later after all dependencies are defined // This breaks the circular dependency between readChannel → handleConnectionLoss → registerChannel @@ -109,7 +123,12 @@ export async function initTransport( for (const peerId of stalePeers) { peerStateManager.removePeer(peerId); reconnectionManager.stopReconnection(peerId); + messageRateLimiter.clearKey(peerId); + connectionRateLimiter.clearKey(peerId); } + // Also prune stale rate limiter entries that may not have peer state + messageRateLimiter.pruneStale(); + connectionRateLimiter.pruneStale(); } /** @@ -297,6 +316,8 @@ export async function initTransport( connectionFactory.dialIdempotent(peerId, hints, false), reuseOrReturnChannel, checkConnectionLimit, + checkConnectionRateLimit: (peerId: string) => + connectionRateLimiter.checkAndRecord(peerId, 'connectionRate'), registerChannel, }); reconnectionHolder.handleConnectionLoss = @@ -326,6 +347,12 @@ export async function initTransport( // Validate message size before sending validateMessageSize(message); + // Check and record message rate limit atomically to prevent TOCTOU race + // Note: This records before send completes, so failed sends consume quota. + // This is intentional - recording after send would allow concurrent sends + // to bypass the rate limit via TOCTOU attacks. + messageRateLimiter.checkAndRecord(targetPeerId, 'messageRate'); + const state = peerStateManager.getState(targetPeerId); // Get or establish channel @@ -334,6 +361,9 @@ export async function initTransport( // Check connection limit before attempting to dial checkConnectionLimit(); + // Check connection attempt rate limit + connectionRateLimiter.checkAndRecord(targetPeerId, 'connectionRate'); + try { const { locationHints: hints } = state; channel = await connectionFactory.dialIdempotent( @@ -508,6 +538,8 @@ export async function initTransport( await connectionFactory.stop(); peerStateManager.clear(); reconnectionManager.clear(); + messageRateLimiter.clear(); + connectionRateLimiter.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 3c72fbc73..e57a0d92a 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -69,6 +69,18 @@ export type RemoteCommsOptions = { * location hints, connection timestamps, and reconnection state. */ stalePeerTimeoutMs?: number | undefined; + /** + * Maximum messages per second per peer (default: 100). + * Messages exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-second window. + */ + maxMessagesPerSecond?: number | undefined; + /** + * Maximum connection attempts per minute per peer (default: 10). + * Connection attempts exceeding this rate are rejected with ResourceLimitError. + * Uses a sliding 1-minute window. + */ + maxConnectionAttemptsPerMinute?: number | undefined; }; export type RemoteInfo = {