Skip to content

[SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path#55201

Open
anblanco wants to merge 1 commit intoapache:branch-4.1from
anblanco:fix/SPARK-53759-backport-4.1
Open

[SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path#55201
anblanco wants to merge 1 commit intoapache:branch-4.1from
anblanco:fix/SPARK-53759-backport-4.1

Conversation

@anblanco
Copy link
Copy Markdown

@anblanco anblanco commented Apr 5, 2026

What changes were proposed in this pull request?

Add an explicit sock_file.flush() in a finally block wrapping the main() call in worker.py's __main__ block.

The simple-worker codepath (used on Windows and when spark.python.use.daemon=false) calls main(sock_file, sock_file) as the last statement. When main() returns, the process exits without flushing the BufferedRWPair write buffer. On Python 3.12+, changed GC finalization ordering (cpython#97922) causes the underlying socket to close before BufferedRWPair.__del__ can flush, resulting in data loss and EOFException on the JVM side.

This mirrors the existing pattern in daemon.py's worker() function, which already has outfile.flush() in its finally block (~line 95).

Also adds a regression test (SimpleWorkerFlushTest) that exercises the simple-worker path with daemon=false.

Why are the changes needed?

The simple-worker path crashes on Python 3.12+ with EOFException:

  • Windows: Always uses simple-worker (os.fork() unavailable) — this crash is reproducible on every worker-dependent operation with Python 3.12+
  • Linux/macOS: Reproducible when spark.python.use.daemon=false

Every worker-dependent operation (rdd.map(), createDataFrame(), UDFs) fails with:

org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
Caused by: java.io.EOFException

This crash reproduces on all currently supported PySpark releases (3.5.8, 4.0.2, 4.1.1) with Python 3.12 and 3.13.

Root cause: The daemon path (daemon.py) wraps worker_main() in try/finally with outfile.flush(). The simple-worker path (worker.py __main__) does not — it relies on BufferedRWPair.__del__ during interpreter shutdown, which is not guaranteed and no longer works as expected on Python 3.12+ due to changed GC finalization ordering.

Does this PR introduce any user-facing change?

Yes. Resolves a crash that prevents PySpark from functioning on Windows with Python 3.12+, and on Linux/macOS with spark.python.use.daemon=false on Python 3.12+.

How was this patch tested?

  • Added SimpleWorkerFlushTest — integration test running rdd.map().collect() with spark.python.use.daemon=false
  • Red/green verified against pip-installed PySpark 4.1.1 on Python 3.12.10 (Windows 11):
    • Without fix: test FAILS deterministically (Python worker exited unexpectedly)
    • With fix (pyspark.zip patched): test PASSES
  • Verification matrix from standalone reproducer (anblanco/spark53759-reproducer):
Platform Python Unpatched Patched
Windows 11 3.11.9 PASS PASS (harmless)
Windows 11 3.12.10 FAIL PASS
Windows 11 3.13.3 FAIL PASS
Linux (Ubuntu 24.04) 3.12.3 FAIL PASS

Note on master: SPARK-55665 refactored worker.py to use a get_sock_file_to_executor() context manager with close() in the finally block. Since BufferedRWPair.close() flushes internally, master is already covered by the structural change. This backport targets the released code structure which lacks that refactoring.

This fix cherry-picks cleanly to branch-4.0 and branch-3.5 (identical code).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.6 (Anthropic)

The simple-worker codepath (used on Windows and when
spark.python.use.daemon=false) does not flush the socket file
before the process exits. On Python 3.12+, changed GC finalization
ordering causes the underlying socket to close before
BufferedRWPair.__del__ can flush the write buffer, resulting in
data loss and EOFException on the JVM side.

This adds an explicit flush() in a finally block wrapping the
main() call in worker.py's __main__ block, mirroring the existing
pattern in daemon.py's worker() function which already has
outfile.flush() in its finally block.

Also adds a regression test (SimpleWorkerFlushTest) that exercises
the simple-worker path with daemon=false.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@anblanco anblanco force-pushed the fix/SPARK-53759-backport-4.1 branch from 5a9215c to fd26afc Compare April 5, 2026 06:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant