refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446
Draft
simonrosenberg wants to merge 8 commits intomainfrom
Draft
refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446simonrosenberg wants to merge 8 commits intomainfrom
simonrosenberg wants to merge 8 commits intomainfrom
Conversation
This refactors the evaluation orchestrator from using ProcessPoolExecutor to asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations. Changes: - Use asyncio.Semaphore to limit concurrent instances - Run sync SDK operations (workspace, conversation) via asyncio.to_thread() - Single-process async concurrency for I/O-bound workload - Update tests for new architecture - Add pytest-asyncio for async test support The workload is I/O-bound (HTTP calls to LLM proxy + runtime API), so single-process async concurrency provides the same parallelism without the memory overhead of separate Python processes. Fixes #441 Co-authored-by: openhands <openhands@all-hands.dev>
The asyncio refactor runs sync SDK operations in thread workers via
asyncio.to_thread(). Two thread-safety bugs caused crashes:
1. redirect_stdout_stderr() replaced the global sys.stdout with a log
file and restored it in a finally block. When multiple threads ran
concurrently, one thread closing its log file would crash others
still writing to it ("I/O operation on closed file").
Fix: Use threading.local() to store per-thread log files and a
_ThreadLocalWriter wrapper that delegates writes to the correct
file for each thread. The wrapper also catches ValueError and
falls back to the original stream.
2. os.environ["LMNR_SPAN_CONTEXT"] was written without
synchronisation, allowing one thread to overwrite another's span
context before it was consumed.
Fix: Protect the write with a threading.Lock().
Also: remove duplicate @pytest.mark.asyncio decorator, remove unused
sys import, run ruff format.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three bugs caused 275/433 instances to produce no results: 1. Timeout starts at task creation, not semaphore acquisition (204 lost) All 433 asyncio tasks are created at once but only 30 run concurrently (semaphore). Instances queued behind the semaphore burned through their 4h timeout while waiting. Fix: Reset start_time to time.monotonic() inside the semaphore context, so the timeout counts from when the instance actually begins running. 2. lmnr_span.end() raises ValueError in worker threads (37 lost) Laminar spans use contextvars which don't work across threads. lmnr_span.end() in the finally block raised "Failed to detach context" ValueError, crashing the thread and discarding the return value (including error outputs from exhausted retries). Fix: Wrap lmnr_span.end() in try/except. 3. Generic Exception handler drops instances silently (exacerbates #2) When task.result() raised from bug #2, the except handler logged the error but did not create an error output or call on_result. Fix: Create error_output and call on_result in the generic Exception handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
setup_instance_logging() modified the root logger's handlers per-call, removing all existing handlers and adding new instance-specific ones. In a multi-threaded environment (asyncio.to_thread workers), thread B would remove thread A's file handler, causing thread A's logs to go to thread B's file or be lost entirely. Fix: Install a single pair of routing handlers on the root logger that delegate to per-thread state via threading.local(): - _ThreadRoutedFileHandler: routes records to the FileHandler stored in the current thread's _logging_local.file_handler - _ThreadRoutedConsoleHandler: applies the formatter/filter stored in the current thread's _logging_local for console output The routing handlers are installed once (protected by a lock), and subsequent calls to setup_instance_logging() only update the thread-local state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set start_time=inf at task creation so instances waiting in semaphore queue can't timeout before they start running - Add fallback in thread-routed logging handlers so main asyncio thread warnings/errors are written to stderr/stdout instead of silently dropped - Retry instances with no output at all in previous attempt, not just critic-flagged failures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts: keep asyncio evaluation.py and console_logging.py from feature branch, take main's versions for all other files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
09c9fb1 to
863df2a
Compare
Resolve conflicts by keeping asyncio architecture from feature branch while incorporating improvements from main: - Update Laminar API to two-phase datapoint linking (create datapoint immediately for UI, link trace when worker starts) - Add _cleanup_workspace helper method - Add _execute_single_attempt method for cleaner retry logic - Fix _get_instances_for_attempt to retry instances with no output in any prior attempt (never_completed set) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The _ThreadRoutedConsoleHandler was writing WARNING+ level messages to sys.__stdout__ when no thread-local formatter was set (for the main asyncio event loop thread). This caused OpenTelemetry context detachment errors to be written to stdout, corrupting the JSON output that shell scripts parse with jq. Changed the fallback to write to sys.__stderr__ instead, matching the behavior of _ThreadRoutedFileHandler. This fixes the "jq: parse error: Invalid numeric literal" failures in gaia, swebench, and swtbench benchmarks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Collaborator
|
This looks interesting! testing it with GLM-5 and swebenchmultimodal that has has some deadlocks.https://github.com/OpenHands/software-agent-sdk/actions/runs/22772705315 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This refactors the evaluation orchestrator from using
ProcessPoolExecutorto asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations (as reported in #441).Problem
Issue #441 documented that evaluation jobs continued to experience OOM kills on SWTBench despite the memory leak fix in #433. The root cause was that even with the parent process memory fix:
With 30 workers and 8Gi container limit, this meant ~267 MB average per worker, but with retries and fragmentation, memory grew monotonically until OOM.
Solution
Since the workload is I/O-bound (HTTP calls to LLM proxy + runtime API), we can use asyncio for concurrent instance processing. This provides:
num_workersasyncio.to_thread()Key Changes
evaluation.py: Refactored from ProcessPoolExecutor to asyncio_run_iterative_mode_async()method with asyncio event loop_run_attempt_async()with semaphore and task management_process_one_mp→_process_one_sync(runs in thread executor)asyncio.wait()withFIRST_COMPLETEDTests: Updated for new architecture
test_async_evaluation.pywith 7 tests for asyncio patternstest_workspace_cleanup.pyto use_process_one_synctest_keyboard_interrupt.py(was testing ProcessPoolExecutor-specific behavior)Dependencies: Added
pytest-asynciofor async test supportTesting
All 39 tests pass:
Pre-commit checks all pass (pyright, ruff, pycodestyle).
Memory Impact
The asyncio approach uses a single Python process with thread executors, eliminating the memory multiplication from separate Python interpreters.
Backward Compatibility
Evaluation.run()still works the same waynum_workersstill controls concurrencyFixes #441
Tracking issue: OpenHands/evaluation#304
Umbrella issue: OpenHands/evaluation#303