This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new d90ac33b91d [Improvement](scan) Update scanner limit controller 
(#61617) (#61962)
d90ac33b91d is described below

commit d90ac33b91de5b64b40e27e8af1370bbef2f789f
Author: Pxl <[email protected]>
AuthorDate: Wed Apr 1 13:31:18 2026 +0800

    [Improvement](scan) Update scanner limit controller (#61617) (#61962)
    
    pick from #61617
---
 be/src/exec/operator/scan_operator.cpp     | 13 +++++----
 be/src/exec/operator/scan_operator.h       |  4 +++
 be/src/exec/scan/scanner_context.cpp       | 44 ++++++++++++++++++++++++++----
 be/src/exec/scan/scanner_context.h         |  8 +++++-
 be/src/exec/scan/scanner_scheduler.cpp     | 32 ++++++++++++++++------
 be/test/exec/scan/scanner_context_test.cpp | 29 ++++++++++----------
 6 files changed, 96 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index 06ef4557060..8842df4b098 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1026,14 +1026,14 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_start_scanners(
         const std::list<std::shared_ptr<ScannerDelegate>>& scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
-    _scanner_ctx = ScannerContext::create_shared(state(), this, 
p._output_tuple_desc,
-                                                 p.output_row_descriptor(), 
scanners, p.limit(),
-                                                 _scan_dependency
+    _scanner_ctx.store(ScannerContext::create_shared(state(), this, 
p._output_tuple_desc,
+                                                     
p.output_row_descriptor(), scanners, p.limit(),
+                                                     _scan_dependency, 
&p._shared_scan_limit
 #ifdef BE_TEST
-                                                 ,
-                                                 
max_scanners_concurrency(state())
+                                                     ,
+                                                     
max_scanners_concurrency(state())
 #endif
-    );
+                                                             ));
     return Status::OK();
 }
 
@@ -1180,6 +1180,7 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* 
pool, const TPlanNode&
     if (tnode.__isset.push_down_count) {
         _push_down_count = tnode.push_down_count;
     }
+    _shared_scan_limit.store(this->_limit, std::memory_order_relaxed);
 }
 
 template <typename LocalStateType>
diff --git a/be/src/exec/operator/scan_operator.h 
b/be/src/exec/operator/scan_operator.h
index 46297a71547..e4e83082559 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -417,6 +417,10 @@ protected:
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
 
+    // Shared remaining limit across all parallel instances and their scanners.
+    // Initialized to _limit (SQL LIMIT); -1 means no limit.
+    std::atomic<int64_t> _shared_scan_limit {-1};
+
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 
     TPushAggOp::type _push_down_agg_type;
diff --git a/be/src/exec/scan/scanner_context.cpp 
b/be/src/exec/scan/scanner_context.cpp
index f7376fd2946..f35c6e1732a 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
                                const TupleDescriptor* output_tuple_desc,
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                               int64_t limit_, std::shared_ptr<Dependency> 
dependency
+                               int64_t limit_, std::shared_ptr<Dependency> 
dependency,
+                               std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST
                                ,
                                int num_parallel_instances
@@ -71,6 +72,7 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
           _output_row_descriptor(output_row_descriptor),
           _batch_size(state->batch_size()),
           limit(limit_),
+          _shared_scan_limit(shared_scan_limit),
           _all_scanners(scanners.begin(), scanners.end()),
 #ifndef BE_TEST
           _scanner_scheduler(local_state->scan_scheduler(state)),
@@ -100,6 +102,27 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
     DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
 }
 
+int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
+    DCHECK(desired > 0);
+    int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
+    while (true) {
+        if (remaining < 0) {
+            // No limit set, grant all desired rows.
+            return desired;
+        }
+        if (remaining == 0) {
+            return 0;
+        }
+        int64_t granted = std::min(desired, remaining);
+        if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - 
granted,
+                                                      
std::memory_order_acq_rel,
+                                                      
std::memory_order_acquire)) {
+            return granted;
+        }
+        // CAS failed, `remaining` is updated to current value, retry.
+    }
+}
+
 // After init function call, should not access _parent
 Status ScannerContext::init() {
 #ifndef BE_TEST
@@ -325,7 +348,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, Block* block, b
         }
     }
 
-    if (_num_finished_scanners == _all_scanners.size() && 
_tasks_queue.empty()) {
+    // Mark finished when either:
+    // (1) all scanners completed normally, or
+    // (2) shared limit exhausted and no scanners are still running.
+    if (_tasks_queue.empty() && (_num_finished_scanners == 
_all_scanners.size() ||
+                                 
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
+                                  _num_scheduled_scanners == 0))) {
         _set_scanner_done();
         _is_finished = true;
     }
@@ -434,11 +462,12 @@ std::string ScannerContext::debug_string() {
     return fmt::format(
             "id: {}, total scanners: {}, pending tasks: {},"
             " _should_stop: {}, _is_finished: {}, free blocks: {},"
-            " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+            " limit: {}, remaining_limit: {}, _num_running_scanners: {}, 
_max_thread_num: {},"
             " _max_bytes_in_queue: {}, query_id: {}",
             ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, 
_is_finished,
-            _free_blocks.size_approx(), limit, _num_scheduled_scanners, 
_max_scan_concurrency,
-            _max_bytes_in_queue, print_id(_query_id));
+            _free_blocks.size_approx(), limit, 
_shared_scan_limit->load(std::memory_order_relaxed),
+            _num_scheduled_scanners, _max_scan_concurrency, 
_max_bytes_in_queue,
+            print_id(_query_id));
 }
 
 void ScannerContext::_set_scanner_done() {
@@ -601,6 +630,11 @@ std::shared_ptr<ScanTask> 
ScannerContext::_pull_next_scan_task(
     }
 
     if (!_pending_scanners.empty()) {
+        // If shared limit quota is exhausted, do not submit new scanners from 
pending queue.
+        int64_t remaining = 
_shared_scan_limit->load(std::memory_order_acquire);
+        if (remaining == 0) {
+            return nullptr;
+        }
         std::shared_ptr<ScanTask> next_scan_task;
         next_scan_task = _pending_scanners.top();
         _pending_scanners.pop();
diff --git a/be/src/exec/scan/scanner_context.h 
b/be/src/exec/scan/scanner_context.h
index 553408ebc96..1aea5da03c9 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -115,7 +115,7 @@ public:
                    const TupleDescriptor* output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
-                   std::shared_ptr<Dependency> dependency
+                   std::shared_ptr<Dependency> dependency, 
std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST
                    ,
                    int num_parallel_instances
@@ -221,6 +221,12 @@ protected:
     int _batch_size;
     // The limit from SQL's limit clause
     int64_t limit;
+    // Points to the shared remaining limit on ScanOperatorX, shared across all
+    // parallel instances and their scanners. -1 means no limit.
+    std::atomic<int64_t>* _shared_scan_limit = nullptr;
+    // Atomically acquire up to `desired` rows. Returns actual granted count 
(0 = exhausted).
+    int64_t acquire_limit_quota(int64_t desired);
+    int64_t remaining_limit() const { return 
_shared_scan_limit->load(std::memory_order_acquire); }
 
     int64_t _max_bytes_in_queue = 0;
     // Using stack so that we can resubmit scanner in a LIFO order, maybe more 
cache friendly
diff --git a/be/src/exec/scan/scanner_scheduler.cpp 
b/be/src/exec/scan/scanner_scheduler.cpp
index 3dfa1fdf4cd..9961407bdbb 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -232,6 +232,11 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     eos = true;
                     break;
                 }
+                // If shared limit quota is exhausted, stop scanning.
+                if (ctx->remaining_limit() == 0) {
+                    eos = true;
+                    break;
+                }
                 if (max_run_time_watch.elapsed_time() >
                     config::doris_scanner_max_run_time_ms * 1e6) {
                     break;
@@ -268,6 +273,23 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 // Check column type only after block is read successfully.
                 // Or it may cause a crash when the block is not normal.
                 _make_sure_virtual_col_is_materialized(scanner, 
free_block.get());
+
+                // Shared limit quota: acquire rows from the context's shared 
pool.
+                // Discard or truncate the block if quota is exhausted.
+                if (free_block->rows() > 0) {
+                    int64_t block_rows = free_block->rows();
+                    int64_t granted = ctx->acquire_limit_quota(block_rows);
+                    if (granted == 0) {
+                        // No quota remaining, discard this block and mark eos.
+                        ctx->return_free_block(std::move(free_block));
+                        eos = true;
+                        break;
+                    } else if (granted < block_rows) {
+                        // Partial quota: truncate block to granted rows and 
mark eos.
+                        free_block->set_num_rows(granted);
+                        eos = true;
+                    }
+                }
                 // Projection will truncate useless columns, makes block size 
change.
                 auto free_block_bytes = free_block->allocated_bytes();
                 raw_bytes_read += free_block_bytes;
@@ -298,15 +320,9 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
 
+                // Per-scanner small-limit optimization: if limit is small (< 
batch_size),
+                // return immediately instead of accumulating to 
raw_bytes_threshold.
                 if (limit > 0 && limit < ctx->batch_size()) {
-                    // If this scanner has limit, and less than batch size,
-                    // return immediately and no need to wait 
raw_bytes_threshold.
-                    // This can save time that each scanner may only return a 
small number of rows,
-                    // but rows are enough from all scanners.
-                    // If not break, the query like "select * from tbl where 
id=1 limit 10"
-                    // may scan a lot data when the "id=1"'s filter ratio is 
high.
-                    // If limit is larger than batch size, this rule is 
skipped,
-                    // to avoid user specify a large limit and causing too 
much small blocks.
                     break;
                 }
 
diff --git a/be/test/exec/scan/scanner_context_test.cpp 
b/be/test/exec/scan/scanner_context_test.cpp
index 35156865bb3..8b76c7c5397 100644
--- a/be/test/exec/scan/scanner_context_test.cpp
+++ b/be/test/exec/scan/scanner_context_test.cpp
@@ -122,6 +122,7 @@ private:
     std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
     std::unique_ptr<ScannerScheduler> scan_scheduler =
             std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest", 
cgroup_cpu_ctl);
+    std::atomic<int64_t> shared_limit {-1};
 };
 
 TEST_F(ScannerContextTest, test_init) {
@@ -150,7 +151,7 @@ TEST_F(ScannerContextTest, test_init) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = false;
 
@@ -210,7 +211,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = true;
 
@@ -268,7 +269,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = false;
 
@@ -318,7 +319,7 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_num_scheduled_scanners = 11;
 
@@ -355,7 +356,7 @@ TEST_F(ScannerContextTest, get_margin) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -451,7 +452,7 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -527,7 +528,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -559,7 +560,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -580,7 +581,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -596,7 +597,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -649,7 +650,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
             std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
@@ -689,7 +690,7 @@ TEST_F(ScannerContextTest, get_free_block) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_newly_create_free_blocks_num->set(0L);
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
@@ -742,7 +743,7 @@ TEST_F(ScannerContextTest, return_free_block) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
     scanner_context->_max_bytes_in_queue = 200;
@@ -786,7 +787,7 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
     scanner_context->_max_bytes_in_queue = 200;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to