This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new d90ac33b91d [Improvement](scan) Update scanner limit controller
(#61617) (#61962)
d90ac33b91d is described below
commit d90ac33b91de5b64b40e27e8af1370bbef2f789f
Author: Pxl <[email protected]>
AuthorDate: Wed Apr 1 13:31:18 2026 +0800
[Improvement](scan) Update scanner limit controller (#61617) (#61962)
pick from #61617
---
be/src/exec/operator/scan_operator.cpp | 13 +++++----
be/src/exec/operator/scan_operator.h | 4 +++
be/src/exec/scan/scanner_context.cpp | 44 ++++++++++++++++++++++++++----
be/src/exec/scan/scanner_context.h | 8 +++++-
be/src/exec/scan/scanner_scheduler.cpp | 32 ++++++++++++++++------
be/test/exec/scan/scanner_context_test.cpp | 29 ++++++++++----------
6 files changed, 96 insertions(+), 34 deletions(-)
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 06ef4557060..8842df4b098 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1026,14 +1026,14 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
- _scanner_ctx = ScannerContext::create_shared(state(), this,
p._output_tuple_desc,
- p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency
+ _scanner_ctx.store(ScannerContext::create_shared(state(), this,
p._output_tuple_desc,
+
p.output_row_descriptor(), scanners, p.limit(),
+ _scan_dependency,
&p._shared_scan_limit
#ifdef BE_TEST
- ,
-
max_scanners_concurrency(state())
+ ,
+
max_scanners_concurrency(state())
#endif
- );
+ ));
return Status::OK();
}
@@ -1180,6 +1180,7 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool*
pool, const TPlanNode&
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
+ _shared_scan_limit.store(this->_limit, std::memory_order_relaxed);
}
template <typename LocalStateType>
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index 46297a71547..e4e83082559 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -417,6 +417,10 @@ protected:
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
+ // Shared remaining limit across all parallel instances and their scanners.
+ // Initialized to _limit (SQL LIMIT); -1 means no limit.
+ std::atomic<int64_t> _shared_scan_limit {-1};
+
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
TPushAggOp::type _push_down_agg_type;
diff --git a/be/src/exec/scan/scanner_context.cpp
b/be/src/exec/scan/scanner_context.cpp
index f7376fd2946..f35c6e1732a 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
- int64_t limit_, std::shared_ptr<Dependency>
dependency
+ int64_t limit_, std::shared_ptr<Dependency>
dependency,
+ std::atomic<int64_t>* shared_scan_limit
#ifdef BE_TEST
,
int num_parallel_instances
@@ -71,6 +72,7 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
+ _shared_scan_limit(shared_scan_limit),
_all_scanners(scanners.begin(), scanners.end()),
#ifndef BE_TEST
_scanner_scheduler(local_state->scan_scheduler(state)),
@@ -100,6 +102,27 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
+int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
+ DCHECK(desired > 0);
+ int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
+ while (true) {
+ if (remaining < 0) {
+ // No limit set, grant all desired rows.
+ return desired;
+ }
+ if (remaining == 0) {
+ return 0;
+ }
+ int64_t granted = std::min(desired, remaining);
+ if (_shared_scan_limit->compare_exchange_weak(remaining, remaining -
granted,
+
std::memory_order_acq_rel,
+
std::memory_order_acquire)) {
+ return granted;
+ }
+ // CAS failed, `remaining` is updated to current value, retry.
+ }
+}
+
// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
@@ -325,7 +348,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, Block* block, b
}
}
- if (_num_finished_scanners == _all_scanners.size() &&
_tasks_queue.empty()) {
+ // Mark finished when either:
+ // (1) all scanners completed normally, or
+ // (2) shared limit exhausted and no scanners are still running.
+ if (_tasks_queue.empty() && (_num_finished_scanners ==
_all_scanners.size() ||
+
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
+ _num_scheduled_scanners == 0))) {
_set_scanner_done();
_is_finished = true;
}
@@ -434,11 +462,12 @@ std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, total scanners: {}, pending tasks: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
- " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+ " limit: {}, remaining_limit: {}, _num_running_scanners: {},
_max_thread_num: {},"
" _max_bytes_in_queue: {}, query_id: {}",
ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop,
_is_finished,
- _free_blocks.size_approx(), limit, _num_scheduled_scanners,
_max_scan_concurrency,
- _max_bytes_in_queue, print_id(_query_id));
+ _free_blocks.size_approx(), limit,
_shared_scan_limit->load(std::memory_order_relaxed),
+ _num_scheduled_scanners, _max_scan_concurrency,
_max_bytes_in_queue,
+ print_id(_query_id));
}
void ScannerContext::_set_scanner_done() {
@@ -601,6 +630,11 @@ std::shared_ptr<ScanTask>
ScannerContext::_pull_next_scan_task(
}
if (!_pending_scanners.empty()) {
+ // If shared limit quota is exhausted, do not submit new scanners from
pending queue.
+ int64_t remaining =
_shared_scan_limit->load(std::memory_order_acquire);
+ if (remaining == 0) {
+ return nullptr;
+ }
std::shared_ptr<ScanTask> next_scan_task;
next_scan_task = _pending_scanners.top();
_pending_scanners.pop();
diff --git a/be/src/exec/scan/scanner_context.h
b/be/src/exec/scan/scanner_context.h
index 553408ebc96..1aea5da03c9 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -115,7 +115,7 @@ public:
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<Dependency> dependency
+ std::shared_ptr<Dependency> dependency,
std::atomic<int64_t>* shared_scan_limit
#ifdef BE_TEST
,
int num_parallel_instances
@@ -221,6 +221,12 @@ protected:
int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
+ // Points to the shared remaining limit on ScanOperatorX, shared across all
+ // parallel instances and their scanners. -1 means no limit.
+ std::atomic<int64_t>* _shared_scan_limit = nullptr;
+ // Atomically acquire up to `desired` rows. Returns actual granted count
(0 = exhausted).
+ int64_t acquire_limit_quota(int64_t desired);
+ int64_t remaining_limit() const { return
_shared_scan_limit->load(std::memory_order_acquire); }
int64_t _max_bytes_in_queue = 0;
// Using stack so that we can resubmit scanner in a LIFO order, maybe more
cache friendly
diff --git a/be/src/exec/scan/scanner_scheduler.cpp
b/be/src/exec/scan/scanner_scheduler.cpp
index 3dfa1fdf4cd..9961407bdbb 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -232,6 +232,11 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
+ // If shared limit quota is exhausted, stop scanning.
+ if (ctx->remaining_limit() == 0) {
+ eos = true;
+ break;
+ }
if (max_run_time_watch.elapsed_time() >
config::doris_scanner_max_run_time_ms * 1e6) {
break;
@@ -268,6 +273,23 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Check column type only after block is read successfully.
// Or it may cause a crash when the block is not normal.
_make_sure_virtual_col_is_materialized(scanner,
free_block.get());
+
+ // Shared limit quota: acquire rows from the context's shared
pool.
+ // Discard or truncate the block if quota is exhausted.
+ if (free_block->rows() > 0) {
+ int64_t block_rows = free_block->rows();
+ int64_t granted = ctx->acquire_limit_quota(block_rows);
+ if (granted == 0) {
+ // No quota remaining, discard this block and mark eos.
+ ctx->return_free_block(std::move(free_block));
+ eos = true;
+ break;
+ } else if (granted < block_rows) {
+ // Partial quota: truncate block to granted rows and
mark eos.
+ free_block->set_num_rows(granted);
+ eos = true;
+ }
+ }
// Projection will truncate useless columns, makes block size
change.
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
@@ -298,15 +320,9 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
+ // Per-scanner small-limit optimization: if limit is small (<
batch_size),
+ // return immediately instead of accumulating to
raw_bytes_threshold.
if (limit > 0 && limit < ctx->batch_size()) {
- // If this scanner has limit, and less than batch size,
- // return immediately and no need to wait
raw_bytes_threshold.
- // This can save time that each scanner may only return a
small number of rows,
- // but rows are enough from all scanners.
- // If not break, the query like "select * from tbl where
id=1 limit 10"
- // may scan a lot data when the "id=1"'s filter ratio is
high.
- // If limit is larger than batch size, this rule is
skipped,
- // to avoid user specify a large limit and causing too
much small blocks.
break;
}
diff --git a/be/test/exec/scan/scanner_context_test.cpp
b/be/test/exec/scan/scanner_context_test.cpp
index 35156865bb3..8b76c7c5397 100644
--- a/be/test/exec/scan/scanner_context_test.cpp
+++ b/be/test/exec/scan/scanner_context_test.cpp
@@ -122,6 +122,7 @@ private:
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<ScannerScheduler> scan_scheduler =
std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest",
cgroup_cpu_ctl);
+ std::atomic<int64_t> shared_limit {-1};
};
TEST_F(ScannerContextTest, test_init) {
@@ -150,7 +151,7 @@ TEST_F(ScannerContextTest, test_init) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -210,7 +211,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = true;
@@ -268,7 +269,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -318,7 +319,7 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_num_scheduled_scanners = 11;
@@ -355,7 +356,7 @@ TEST_F(ScannerContextTest, get_margin) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -451,7 +452,7 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -527,7 +528,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -559,7 +560,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -580,7 +581,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -596,7 +597,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -649,7 +650,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
@@ -689,7 +690,7 @@ TEST_F(ScannerContextTest, get_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
@@ -742,7 +743,7 @@ TEST_F(ScannerContextTest, return_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
@@ -786,7 +787,7 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]