fix: Fix various bugs in async implementation of CometShuffleExternalSorter
#3195
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR addresses several concurrency and resource management issues in the async shuffle external sorter that could cause CPU waste, memory leaks, and improper cleanup during error conditions.
Builds on #3192
Changes
Problem: When the maximum number of concurrent spill tasks was reached, the code used a CPU-burning spin loop to check if any task completed.
Before:
After: Block on the oldest task using Future.get(), which properly yields the CPU while waiting.
Problem: If one async task threw an exception, subsequent tasks were not awaited, potentially leaving background threads running and resources unreleased.
After: Wait for ALL tasks to complete, collecting exceptions using addSuppressed() to preserve error information from multiple failures.
Problem: Used assert for validating threadNum > 0 (disabled in production) and no null check for thread pool.
After: Proper runtime checks with descriptive error messages:
Problem: If writeSortedFileNative() threw an exception, the SpillSorter remained in the spillingSorters queue with its memory unreleased.
After: Wrap in try-finally to ensure freeMemory(), freeArray(), and spillingSorters.remove() always execute.
Problem: When a task was killed or aborted, background spill threads continued running with no way to stop them.
After: Cancel all pending async tasks and wait briefly for their cleanup to complete before freeing memory and deleting spill files.
Problem: Field could have stale reads when accessed from different threads (main thread vs background spill threads).
After: Added volatile modifier to ensure visibility across threads.