Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR introduces a shared, atomic scan limit that is enforced across all parallel scan instances (and their scanners) so that total produced rows respect the SQL LIMIT globally rather than per-instance.
Changes:
- Thread a shared
std::atomic<int64_t>limit counter fromScanOperatorXintoScannerContext. - Enforce the shared limit in
ScannerScheduler::_scanner_scan()by discarding/truncating blocks when quota is exhausted. - Update unit tests to pass a
shared_limitpointer intoScannerContext::create_shared().
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| be/test/exec/scan/scanner_context_test.cpp | Updates test setup to provide the new shared-limit parameter to ScannerContext. |
| be/src/exec/scan/scanner_scheduler.cpp | Stops scanning and truncates/discards blocks when the shared limit quota is exhausted. |
| be/src/exec/scan/scanner_context.h | Extends ScannerContext API/state with a pointer to a shared limit and quota acquisition helpers. |
| be/src/exec/scan/scanner_context.cpp | Implements atomic quota acquisition and finishes early when shared limit is exhausted. |
| be/src/exec/operator/scan_operator.h | Adds _shared_scan_limit to ScanOperatorX to share remaining limit across instances. |
| be/src/exec/operator/scan_operator.cpp | Initializes _shared_scan_limit and passes it into ScannerContext. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Points to the shared remaining limit on ScanOperatorX, shared across all | ||
| // parallel instances and their scanners. -1 means no limit. | ||
| std::atomic<int64_t>* _shared_scan_limit = nullptr; |
There was a problem hiding this comment.
remaining_limit() unconditionally dereferences _shared_scan_limit, which is default-initialized to nullptr and is also used throughout the implementation (e.g., quota acquisition / finish checks). If any call path constructs a ScannerContext without a valid shared limit pointer (or with a pointer whose lifetime ends early), this will crash. Make _shared_scan_limit a non-null invariant (e.g., store a reference, or enforce non-null via constructor + DCHECK(_shared_scan_limit != nullptr)), and document/guarantee that the pointed-to atomic outlives the ScannerContext.
| // Points to the shared remaining limit on ScanOperatorX, shared across all | |
| // parallel instances and their scanners. -1 means no limit. | |
| std::atomic<int64_t>* _shared_scan_limit = nullptr; | |
| // Default shared remaining limit used to preserve a non-null invariant for | |
| // _shared_scan_limit. A value of -1 means "no limit". | |
| inline static std::atomic<int64_t> _default_shared_scan_limit {-1}; | |
| // Points to the shared remaining limit on ScanOperatorX, shared across all | |
| // parallel instances and their scanners. This pointer must always be non-null; | |
| // if a caller does not provide one explicitly, it falls back to | |
| // _default_shared_scan_limit. Any externally supplied atomic must outlive this | |
| // ScannerContext. A value of -1 means no limit. | |
| std::atomic<int64_t>* _shared_scan_limit = &_default_shared_scan_limit; |
| std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_shared<CgroupV2CpuCtl>(1); | ||
| std::unique_ptr<ScannerScheduler> scan_scheduler = | ||
| std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest", cgroup_cpu_ctl); | ||
| std::atomic<int64_t> shared_limit {-1}; |
There was a problem hiding this comment.
Tests now pass a shared_limit pointer but keep it at -1 (no limit). This means the newly added shared-limit enforcement paths (quota acquisition, truncation/discard-on-exhaustion, early finish when remaining==0) are not exercised by these unit tests. To cover the new behavior, initialize shared_limit from limit in tests that set a positive limit, and add at least one case asserting: (1) blocks are truncated when the remaining quota is smaller than a block, and (2) scanning stops/finishes when quota reaches 0.
| std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared( | ||
| state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, | ||
| scanners, limit, scan_dependency, parallel_tasks); | ||
| scanners, limit, scan_dependency, &shared_limit, parallel_tasks); |
There was a problem hiding this comment.
Tests now pass a shared_limit pointer but keep it at -1 (no limit). This means the newly added shared-limit enforcement paths (quota acquisition, truncation/discard-on-exhaustion, early finish when remaining==0) are not exercised by these unit tests. To cover the new behavior, initialize shared_limit from limit in tests that set a positive limit, and add at least one case asserting: (1) blocks are truncated when the remaining quota is smaller than a block, and (2) scanning stops/finishes when quota reaches 0.
| const RowDescriptor* output_row_descriptor, | ||
| const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, | ||
| std::shared_ptr<Dependency> dependency | ||
| std::shared_ptr<Dependency> dependency, std::atomic<int64_t>* shared_scan_limit |
There was a problem hiding this comment.
Passing a raw pointer for shared_scan_limit makes lifetime requirements implicit and easy to violate (dangling pointer risk), especially since ScannerContext instances can outlive some surrounding scaffolding during shutdown/cancellation. Consider making the API express ownership/lifetime explicitly (e.g., std::shared_ptr<std::atomic<int64_t>>, or a small shared state object), or accept/store a reference with a clearly documented guarantee that the referenced atomic outlives ScannerContext.
| std::shared_ptr<Dependency> dependency, std::atomic<int64_t>* shared_scan_limit | |
| std::shared_ptr<Dependency> dependency, | |
| std::shared_ptr<std::atomic<int64_t>> shared_scan_limit |
pick from #61617
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)