[SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path#55201
Open
anblanco wants to merge 1 commit intoapache:branch-4.1from
Open
[SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path#55201anblanco wants to merge 1 commit intoapache:branch-4.1from
anblanco wants to merge 1 commit intoapache:branch-4.1from
Conversation
The simple-worker codepath (used on Windows and when spark.python.use.daemon=false) does not flush the socket file before the process exits. On Python 3.12+, changed GC finalization ordering causes the underlying socket to close before BufferedRWPair.__del__ can flush the write buffer, resulting in data loss and EOFException on the JVM side. This adds an explicit flush() in a finally block wrapping the main() call in worker.py's __main__ block, mirroring the existing pattern in daemon.py's worker() function which already has outfile.flush() in its finally block. Also adds a regression test (SimpleWorkerFlushTest) that exercises the simple-worker path with daemon=false. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5a9215c to
fd26afc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Add an explicit
sock_file.flush()in afinallyblock wrapping themain()call inworker.py's__main__block.The simple-worker codepath (used on Windows and when
spark.python.use.daemon=false) callsmain(sock_file, sock_file)as the last statement. Whenmain()returns, the process exits without flushing theBufferedRWPairwrite buffer. On Python 3.12+, changed GC finalization ordering (cpython#97922) causes the underlying socket to close beforeBufferedRWPair.__del__can flush, resulting in data loss andEOFExceptionon the JVM side.This mirrors the existing pattern in
daemon.py'sworker()function, which already hasoutfile.flush()in itsfinallyblock (~line 95).Also adds a regression test (
SimpleWorkerFlushTest) that exercises the simple-worker path withdaemon=false.Why are the changes needed?
The simple-worker path crashes on Python 3.12+ with
EOFException:os.fork()unavailable) — this crash is reproducible on every worker-dependent operation with Python 3.12+spark.python.use.daemon=falseEvery worker-dependent operation (
rdd.map(),createDataFrame(), UDFs) fails with:This crash reproduces on all currently supported PySpark releases (3.5.8, 4.0.2, 4.1.1) with Python 3.12 and 3.13.
Root cause: The daemon path (
daemon.py) wrapsworker_main()intry/finallywithoutfile.flush(). The simple-worker path (worker.py__main__) does not — it relies onBufferedRWPair.__del__during interpreter shutdown, which is not guaranteed and no longer works as expected on Python 3.12+ due to changed GC finalization ordering.Does this PR introduce any user-facing change?
Yes. Resolves a crash that prevents PySpark from functioning on Windows with Python 3.12+, and on Linux/macOS with
spark.python.use.daemon=falseon Python 3.12+.How was this patch tested?
SimpleWorkerFlushTest— integration test runningrdd.map().collect()withspark.python.use.daemon=falsePython worker exited unexpectedly)Note on master: SPARK-55665 refactored
worker.pyto use aget_sock_file_to_executor()context manager withclose()in thefinallyblock. SinceBufferedRWPair.close()flushes internally, master is already covered by the structural change. This backport targets the released code structure which lacks that refactoring.This fix cherry-picks cleanly to
branch-4.0andbranch-3.5(identical code).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6 (Anthropic)