Skip to content

fix: true parallel tool execution via subscribeOn(Schedulers.io()) in concatMapEager#1154

Open
YuqiGuo105 wants to merge 1 commit intogoogle:mainfrom
YuqiGuo105:fix/parallel-tool-execution-1152
Open

fix: true parallel tool execution via subscribeOn(Schedulers.io()) in concatMapEager#1154
YuqiGuo105 wants to merge 1 commit intogoogle:mainfrom
YuqiGuo105:fix/parallel-tool-execution-1152

Conversation

@YuqiGuo105
Copy link
Copy Markdown
Contributor

@YuqiGuo105 YuqiGuo105 commented Apr 22, 2026

Summary

Fixes #1152ToolExecutionMode.PARALLEL was silently executing tools serially. Adding .subscribeOn(Schedulers.io()) inside concatMapEager dispatches each tool's full pipeline to an IO thread, enabling true concurrent execution.


Root Cause & Fix

Without subscribeOn, all subscriptions run on the calling thread. Blocking tools hold that thread, preventing the next subscription from starting.

// Before (broken) — all subscriptions on calling thread
Observable.fromIterable(validFunctionCalls)
    .concatMapEager(call -> functionCallMapper.apply(call).toObservable());

// After (fixed) — each tool dispatched to an IO thread
Observable.fromIterable(validFunctionCalls)
    .concatMapEager(call ->
        functionCallMapper.apply(call).toObservable().subscribeOn(Schedulers.io()));

Applied identically in both handleFunctionCalls and handleFunctionCallsLive.


Execution Workflow

Before — serial on calling thread

Time ──────────────────────────────────────────────────────────────►

Calling thread:
  [=== tool_1 (1000 ms) ===][=== tool_2 (1000 ms) ===][=== tool_3 (1000 ms) ===]
                                                                           Total: ~3000 ms

After — concurrent on IO thread pool

Time ──────────────────────────────────────────────────────────────►

Calling thread: [dispatch][dispatch][dispatch][wait ...]
IO thread-1:    [======== tool_1 (1000 ms) ========]
IO thread-2:    [======== tool_2 (1000 ms) ========]
IO thread-3:    [======== tool_3 (1000 ms) ========]
                                                    ▲
                                            Total: ~1000 ms

concatMapEager guarantees results are emitted in input order despite concurrent execution.


Tests

All 26 tests pass (Tests run: 26, Failures: 0, Errors: 0). 12 new tests added:

# Test method Tool Mode Assertion
1 handleFunctionCalls_parallelMode_runsToolsConcurrently SlowTool ×2 @ 500 ms PARALLEL duration < 900 ms
2 handleFunctionCalls_parallelMode_preservesInputOrder SlowTool slow(500ms), fast(100ms) PARALLEL response[0] == slow tool
3 handleFunctionCalls_noneModeDefaultsToParallel_runsToolsConcurrently SlowTool ×2 @ 500 ms NONE duration < 900 ms
4 handleFunctionCalls_sequentialMode_runsToolsSerially SlowTool ×2 @ 300 ms SEQUENTIAL duration ≥ 600 ms
5 handleFunctionCallsLive_parallelMode_runsToolsConcurrently SlowTool ×2 @ 500 ms PARALLEL (Live path) duration < 900 ms
6 handleFunctionCalls_parallelMode_threeTools_allStartConcurrently SlowTool ×3 @ 500 ms PARALLEL start times within 150 ms spread
7 handleFunctionCalls_blockingTools_parallel_executesInParallel BlockingTool ×3 @ 1000 ms PARALLEL duration < 1500 ms
8 handleFunctionCalls_blockingTools_defaultNoneMode_executesInParallel BlockingTool ×2 @ 1000 ms NONE duration < 1500 ms
9 handleFunctionCalls_blockingTools_sequential_executesSerially BlockingTool ×2 @ 500 ms SEQUENTIAL duration ≥ 1000 ms
10 handleFunctionCalls_blockingTools_parallel_preservesOrder BlockingTool slow(800ms), fast(100ms) PARALLEL response[0] == slow tool
11 handleFunctionCalls_blockingTools_parallel_usesMultipleThreads BlockingTool ×2 @ 500 ms PARALLEL thread1 ≠ thread2
12 handleFunctionCalls_nonBlockingTools_parallel_stillWorksCorrectly EchoTool ×3 PARALLEL correct responses in order (regression)

@hemasekhar-p
Copy link
Copy Markdown

Hi @YuqiGuo105, Thank you for your contribution! We appreciate you taking the time to submit this pull request. I have noticed The Maven build is failing due to test case failures in this PR. Could you please address these failure?

@hemasekhar-p hemasekhar-p added the waiting on reporter Waiting for reaction by reporter. Failing that, maintainers will eventually closed it as stale. label Apr 22, 2026
@hemasekhar-p hemasekhar-p self-assigned this Apr 22, 2026
@YuqiGuo105 YuqiGuo105 force-pushed the fix/parallel-tool-execution-1152 branch from 447c088 to 1db5e45 Compare April 22, 2026 15:18
@YuqiGuo105
Copy link
Copy Markdown
Contributor Author

Hi @hemasekhar-p, the failed test case is fixed.

Reason

Every pre-existing test that failed involved exactly one tool call. When the parallel path (concatMapEager + subscribeOn(Schedulers.io())) was applied unconditionally, a single tool's execution was still dispatched to an IO thread. This caused two async timing problems:

  1. OTel span not yet endeddoFinally(span.end()) runs on the IO thread after blockingGet() returns on the main thread, so the test reads spans before they're exported.
  2. Error not yet delivered — the tool error propagates asynchronously on the IO thread, arriving after the test's assertError() already ran (seeing errors=0).

Fix

if (invocationContext.runConfig().toolExecutionMode() == ToolExecutionMode.SEQUENTIAL
    || validFunctionCalls.size() <= 1) {

The || validFunctionCalls.size() <= 1 guard routes single-tool calls to concatMapMaybe, which runs entirely on the calling thread — no IO dispatch, no async race.

Since parallel vs. sequential is meaningless for a single task, this is semantically correct and has no functional impact.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

waiting on reporter Waiting for reaction by reporter. Failing that, maintainers will eventually closed it as stale.

Projects

None yet

2 participants