This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fbcf3380971 [refactor](scanner) refactoring and optimizing scanner 
scheduling (#30746)
fbcf3380971 is described below

commit fbcf33809719140253a40c913c480f5d0f103830
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Wed Feb 7 18:08:24 2024 +0800

    [refactor](scanner) refactoring and optimizing scanner scheduling (#30746)
---
 be/src/pipeline/exec/file_scan_operator.cpp        |   8 +-
 be/src/pipeline/exec/scan_operator.cpp             |  27 +-
 be/src/pipeline/exec/scan_operator.h               |   3 +-
 be/src/runtime/runtime_state.h                     |   8 +-
 be/src/vec/exec/scan/new_file_scan_node.cpp        |   9 +-
 be/src/vec/exec/scan/pip_scanner_context.h         | 242 +---------
 be/src/vec/exec/scan/scanner_context.cpp           | 515 +++++++++------------
 be/src/vec/exec/scan/scanner_context.h             | 213 ++++-----
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 297 ++++--------
 be/src/vec/exec/scan/scanner_scheduler.h           |  44 +-
 be/src/vec/exec/scan/vscan_node.cpp                |  25 +-
 be/src/vec/exec/scan/vscan_node.h                  |   3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  42 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 14 files changed, 499 insertions(+), 939 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 51fa60f067d..ac193147dfb 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -38,8 +38,10 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     }
 
     auto& p = _parent->cast<FileScanOperatorX>();
-    size_t shard_num =
-            std::min<size_t>(config::doris_scanner_thread_pool_thread_num, 
_scan_ranges.size());
+    size_t shard_num = std::min<size_t>(
+            config::doris_scanner_thread_pool_thread_num / 
state()->query_parallel_instance_num(),
+            _scan_ranges.size());
+    shard_num = std::max(shard_num, (size_t)1);
     _kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
     for (auto& scan_range : _scan_ranges) {
         std::unique_ptr<vectorized::VFileScanner> scanner = 
vectorized::VFileScanner::create_unique(
@@ -62,7 +64,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
     int max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    max_scanners = max_scanners == 0 ? 1 : max_scanners;
+    max_scanners = std::max(std::max(max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
     // For select * from table limit 10; should just use one thread.
     if (should_run_serial()) {
         max_scanners = 1;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d9fced39b05..f19bed90a9e 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -160,7 +160,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     if (_scanner_ctx) {
         DCHECK(!_eos && _num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
-        
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
     }
     _opened = true;
     return status;
@@ -1288,16 +1287,14 @@ Status ScanLocalState<Derived>::_init_profile() {
     profile()->add_child(_scanner_profile.get(), true, nullptr);
 
     _memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, 
"MemoryUsage", 1);
-    _queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
-            "QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
     _free_blocks_memory_usage =
             _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", 
TUnit::BYTES, "MemoryUsage", 1);
     _newly_create_free_blocks_num =
             ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
+    _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, 
"NumScaleUpScanners", TUnit::UNIT);
     // time of transfer thread to wait for block from scan thread
     _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, 
"ScannerBatchWaitTime");
     _scanner_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerSchedCount", TUnit::UNIT);
-    _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerCtxSchedCount", TUnit::UNIT);
     _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, 
"ScannerCtxSchedTime");
 
     _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
@@ -1456,14 +1453,10 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
     }};
 
     if (state->is_cancelled()) {
-        // ISSUE: https://github.com/apache/doris/issues/16360
-        // _scanner_ctx may be null here, see: `VScanNode::alloc_resource` 
(_eos == null)
         if (local_state._scanner_ctx) {
-            
local_state._scanner_ctx->set_status_on_error(Status::Cancelled("query 
cancelled"));
-            return local_state._scanner_ctx->status();
-        } else {
-            return Status::Cancelled("query cancelled");
+            local_state._scanner_ctx->stop_scanners(state);
         }
+        return Status::Cancelled("Query cancelled in ScanOperator");
     }
 
     if (local_state._eos) {
@@ -1471,21 +1464,11 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
         return Status::OK();
     }
 
-    vectorized::BlockUPtr scan_block = nullptr;
     bool eos = false;
-    RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, 
&scan_block, &eos, 0));
-    if (eos) {
-        source_state = SourceState::FINISHED;
-        DCHECK(scan_block == nullptr);
-        return Status::OK();
-    }
-
-    // get scanner's block memory
-    block->swap(*scan_block);
-    local_state._scanner_ctx->return_free_block(std::move(scan_block));
+    RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state, 
block, &eos, 0));
 
     local_state.reached_limit(block, source_state);
-    if (eos) {
+    if (eos || source_state == SourceState::FINISHED) {
         source_state = SourceState::FINISHED;
         // reach limit, stop the scanners.
         local_state._scanner_ctx->stop_scanners(state);
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 8fac0b946ea..add2249276f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -142,7 +142,6 @@ protected:
 
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
-    RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
     RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
     RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@@ -160,8 +159,8 @@ protected:
     // time of filter output block from scanner
     RuntimeProfile::Counter* _filter_timer = nullptr;
     RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = 
nullptr;
     RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+    RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
     // rows read from the scanner (including those discarded by (pre)filters)
     RuntimeProfile::Counter* _rows_read_counter = nullptr;
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 91443ef9492..38053d3cb68 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -128,7 +128,13 @@ public:
                                                         : 
_query_options.query_timeout;
     }
     int max_io_buffers() const { return _query_options.max_io_buffers; }
-    int num_scanner_threads() const { return 
_query_options.num_scanner_threads; }
+    int num_scanner_threads() const {
+        return _query_options.__isset.num_scanner_threads ? 
_query_options.num_scanner_threads : 0;
+    }
+    double scanner_scale_up_ratio() const {
+        return _query_options.__isset.scanner_scale_up_ratio ? 
_query_options.scanner_scale_up_ratio
+                                                             : 0;
+    }
     TQueryType::type query_type() const { return _query_options.query_type; }
     int64_t timestamp_ms() const { return _timestamp_ms; }
     int32_t nano_seconds() const { return _nano_seconds; }
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index da33538b8c3..2ce80f4463a 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -62,7 +62,7 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
                                       const std::vector<TScanRangeParams>& 
scan_ranges) {
     int max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    max_scanners = max_scanners == 0 ? 1 : max_scanners;
+    max_scanners = std::max(std::max(max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
     // For select * from table limit 10; should just use one thread.
     if (should_run_serial()) {
         max_scanners = 1;
@@ -116,9 +116,10 @@ Status 
NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
         return Status::OK();
     }
 
-    // TODO: determine kv cache shard num
-    size_t shard_num =
-            std::min<size_t>(config::doris_scanner_thread_pool_thread_num, 
_scan_ranges.size());
+    size_t shard_num = std::min<size_t>(
+            config::doris_scanner_thread_pool_thread_num / 
_state->query_parallel_instance_num(),
+            _scan_ranges.size());
+    shard_num = std::max(shard_num, (size_t)1);
     _kv_cache.reset(new ShardedKVCache(shard_num));
     for (auto& scan_range : _scan_ranges) {
         std::unique_ptr<VFileScanner> scanner =
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 62f6f9edb21..b69f7c031d4 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -36,129 +36,6 @@ public:
             : vectorized::ScannerContext(state, parent, output_tuple_desc, 
output_row_descriptor,
                                          scanners, limit_, 
max_bytes_in_blocks_queue,
                                          num_parallel_instances) {}
-
-    Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
-                                int id) override {
-        {
-            std::unique_lock l(_transfer_lock);
-            if (state->is_cancelled()) {
-                set_status_on_error(Status::Cancelled("cancelled"), false);
-            }
-
-            if (!status().ok()) {
-                return _process_status;
-            }
-        }
-
-        std::vector<vectorized::BlockUPtr> merge_blocks;
-        {
-            std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
-            // The pipeline maybe wake up by scanner.done. If there are still 
any data
-            // in the queue, should read the data first and then check if the 
scanner.done
-            // if done, then eos is returned to indicate that the scan 
operator finished.
-            if (_blocks_queues[id].empty()) {
-                *eos = done();
-                return Status::OK();
-            }
-            if (_process_status.is<ErrorCode::CANCELLED>()) {
-                *eos = true;
-                return Status::OK();
-            }
-            *block = std::move(_blocks_queues[id].front());
-            _blocks_queues[id].pop_front();
-
-            auto rows = (*block)->rows();
-            while (!_blocks_queues[id].empty()) {
-                const auto add_rows = (*_blocks_queues[id].front()).rows();
-                if (rows + add_rows < state->batch_size()) {
-                    rows += add_rows;
-                    
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
-                    _blocks_queues[id].pop_front();
-                } else {
-                    break;
-                }
-            }
-
-            if (_blocks_queues[id].empty()) {
-                this->reschedule_scanner_ctx();
-            }
-        }
-
-        _current_used_bytes -= (*block)->allocated_bytes();
-        if (!merge_blocks.empty()) {
-            vectorized::MutableBlock m(block->get());
-            for (auto& merge_block : merge_blocks) {
-                _current_used_bytes -= merge_block->allocated_bytes();
-                static_cast<void>(m.merge(*merge_block));
-                return_free_block(std::move(merge_block));
-            }
-            (*block)->set_columns(std::move(m.mutable_columns()));
-        }
-
-        // after return free blocks, should try to reschedule the scanner
-        if (should_be_scheduled()) {
-            this->reschedule_scanner_ctx();
-        }
-
-        return Status::OK();
-    }
-
-    void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) 
override {
-        const int queue_size = _blocks_queues.size();
-        const int block_size = blocks.size();
-        if (block_size == 0) {
-            return;
-        }
-        int64_t local_bytes = 0;
-
-        for (const auto& block : blocks) {
-            auto st = validate_block_schema(block.get());
-            if (!st.ok()) {
-                set_status_on_error(st, false);
-            }
-            local_bytes += block->allocated_bytes();
-        }
-
-        for (int i = 0; i < queue_size && i < block_size; ++i) {
-            int queue = _next_queue_to_feed;
-            {
-                std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
-                for (int j = i; j < block_size; j += queue_size) {
-                    _blocks_queues[queue].emplace_back(std::move(blocks[j]));
-                }
-            }
-            _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
-        }
-        _current_used_bytes += local_bytes;
-    }
-
-    bool empty_in_queue(int id) override {
-        std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
-        return _blocks_queues[id].empty();
-    }
-
-    Status init() override {
-        for (int i = 0; i < _num_parallel_instances; ++i) {
-            _queue_mutexs.emplace_back(std::make_unique<std::mutex>());
-            _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
-        }
-        return ScannerContext::init();
-    }
-
-    std::string debug_string() override {
-        auto res = ScannerContext::debug_string();
-        for (int i = 0; i < _blocks_queues.size(); ++i) {
-            res += " queue " + std::to_string(i) + ":size " +
-                   std::to_string(_blocks_queues[i].size());
-        }
-        return res;
-    }
-
-protected:
-    int _next_queue_to_feed = 0;
-    std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
-    std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
-    std::atomic_int64_t _current_used_bytes = 0;
 };
 
 class PipXScannerContext final : public vectorized::ScannerContext {
@@ -172,117 +49,38 @@ public:
                        int64_t limit_, int64_t max_bytes_in_blocks_queue,
                        std::shared_ptr<pipeline::ScanDependency> dependency)
             : vectorized::ScannerContext(state, output_tuple_desc, 
output_row_descriptor, scanners,
-                                         limit_, max_bytes_in_blocks_queue, 1, 
local_state,
-                                         dependency) {}
-    Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
-                                int id) override {
-        if (_blocks_queue_buffered.empty()) {
-            std::unique_lock l(_transfer_lock);
-            if (state->is_cancelled()) {
-                set_status_on_error(Status::Cancelled("cancelled"), false);
-            }
-
-            if (!status().ok()) {
-                return _process_status;
-            }
-
-            if (_blocks_queue.empty()) {
-                *eos = done();
-                return Status::OK();
-            }
-            if (_process_status.is<ErrorCode::CANCELLED>()) {
-                *eos = true;
-                return Status::OK();
-            }
-
-            _blocks_queue_buffered = std::move(_blocks_queue);
-        }
-
-        // `get_block_from_queue` should not be called concurrently from 
multiple threads,
-        // so here no need to lock.
-        *block = std::move(_blocks_queue_buffered.front());
-        _blocks_queue_buffered.pop_front();
+                                         limit_, max_bytes_in_blocks_queue, 1, 
local_state) {
+        _dependency = dependency;
+    }
 
-        std::vector<vectorized::BlockUPtr> merge_blocks;
-        auto rows = (*block)->rows();
-        while (!_blocks_queue_buffered.empty()) {
-            const auto add_rows = (*_blocks_queue_buffered.front()).rows();
-            if (rows + add_rows < state->batch_size()) {
-                rows += add_rows;
-                
merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front()));
-                _blocks_queue_buffered.pop_front();
-            } else {
-                break;
-            }
+    void append_block_to_queue(std::shared_ptr<vectorized::ScanTask> 
scan_task) override {
+        vectorized::ScannerContext::append_block_to_queue(scan_task);
+        if (_dependency) {
+            _dependency->set_ready();
         }
+    }
 
-        if (_blocks_queue_buffered.empty()) {
-            std::unique_lock l(_transfer_lock);
-            if (_blocks_queue.empty()) {
-                this->reschedule_scanner_ctx();
+    Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, 
bool* eos, int id,
+                                bool wait = true) override {
+        Status st = vectorized::ScannerContext::get_block_from_queue(state, 
block, eos, id, wait);
+        std::lock_guard<std::mutex> l(_transfer_lock);
+        if (_blocks_queue.empty()) {
+            if (_dependency) {
                 _dependency->block();
-            } else {
-                _blocks_queue_buffered = std::move(_blocks_queue);
-            }
-        }
-
-        _cur_bytes_in_queue -= (*block)->allocated_bytes();
-        if (!merge_blocks.empty()) {
-            vectorized::MutableBlock m(block->get());
-            for (auto& merge_block : merge_blocks) {
-                _cur_bytes_in_queue -= merge_block->allocated_bytes();
-                static_cast<void>(m.merge(*merge_block));
-                if (merge_block->mem_reuse()) {
-                    _free_blocks_buffered.emplace_back(std::move(merge_block));
-                }
             }
-            (*block)->set_columns(std::move(m.mutable_columns()));
-        }
-        return_free_blocks();
-
-        // after return free blocks, should try to reschedule the scanner
-        if (should_be_scheduled()) {
-            this->reschedule_scanner_ctx();
         }
-
-        return Status::OK();
+        return st;
     }
 
-    void reschedule_scanner_ctx() override {
-        if (done()) {
-            return;
-        }
-        auto state = _scanner_scheduler->submit(shared_from_this());
-        //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
-        if (state.ok()) {
-            _num_scheduling_ctx++;
-        } else {
-            set_status_on_error(state, false);
+protected:
+    void _set_scanner_done() override {
+        if (_dependency) {
+            _dependency->set_scanner_done();
         }
     }
 
 private:
-    void return_free_blocks() {
-        if (_free_blocks_buffered.empty()) {
-            return;
-        }
-
-        size_t total_bytes = 0;
-        for (auto& block : _free_blocks_buffered) {
-            const auto bytes = block->allocated_bytes();
-            block->clear_column_data();
-            _estimated_block_bytes = std::max(bytes, (size_t)16);
-            total_bytes += bytes;
-        }
-        _free_blocks_memory_usage->add(total_bytes);
-        const auto count = _free_blocks_buffered.size();
-        
_free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()),
 count);
-        _free_blocks_buffered.clear();
-        _serving_blocks_num -= count;
-    }
-
-    std::vector<vectorized::BlockUPtr> _free_blocks_buffered;
-    std::list<vectorized::BlockUPtr> _blocks_queue_buffered;
+    std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index be143b9f729..45e934ee790 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -17,12 +17,10 @@
 
 #include "scanner_context.h"
 
-#include <bthread/bthread.h>
 #include <fmt/format.h>
 #include <gen_cpp/Metrics_types.h>
 #include <glog/logging.h>
 
-#include <algorithm>
 #include <mutex>
 #include <ostream>
 #include <utility>
@@ -31,64 +29,57 @@
 #include "common/status.h"
 #include "pipeline/exec/scan_operator.h"
 #include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
-#include "util/pretty_printer.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
-#include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/exec/scan/vscan_node.h"
-#include "vec/exec/scan/vscanner.h"
 
 namespace doris::vectorized {
 
 using namespace std::chrono_literals;
 
-static bvar::Status<int64_t> 
g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
-static bvar::Status<int64_t> 
g_num_running_scanners("doris_num_running_scanners", 0);
-
 ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* 
output_tuple_desc,
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
                                int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
                                const int num_parallel_instances,
-                               pipeline::ScanLocalStateBase* local_state,
-                               std::shared_ptr<pipeline::ScanDependency> 
dependency)
+                               pipeline::ScanLocalStateBase* local_state)
         : HasTaskExecutionCtx(state),
           _state(state),
-          _parent(nullptr),
           _local_state(local_state),
           _output_tuple_desc(output_row_descriptor
                                      ? 
output_row_descriptor->tuple_descriptors().front()
                                      : output_tuple_desc),
           _output_row_descriptor(output_row_descriptor),
-          _process_status(Status::OK()),
           _batch_size(state->batch_size()),
           limit(limit_),
           _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
                               num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners.begin(), scanners.end()),
           _all_scanners(scanners.begin(), scanners.end()),
-          _num_parallel_instances(num_parallel_instances),
-          _dependency(dependency) {
+          _num_parallel_instances(num_parallel_instances) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
-    if (_scanners.empty()) {
+    // Provide more memory for wide tables, increase proportionally by 
multiples of 300
+    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
+    if (scanners.empty()) {
         _is_finished = true;
         _set_scanner_done();
     }
+    _scanners.enqueue_bulk(scanners.begin(), scanners.size());
     if (limit < 0) {
         limit = -1;
     }
-    _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
+    MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
+    _max_thread_num = _state->num_scanner_threads() > 0
+                              ? _state->num_scanner_threads()
+                              : config::doris_scanner_thread_pool_thread_num /
+                                        state->query_parallel_instance_num();
     _max_thread_num *= num_parallel_instances;
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
-    DCHECK(_max_thread_num > 0);
-    _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+    _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
     // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
     if ((_parent && _parent->should_run_serial()) ||
@@ -104,45 +95,9 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
                                int64_t limit_, int64_t 
max_bytes_in_blocks_queue,
                                const int num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state)
-        : HasTaskExecutionCtx(state),
-          _state(state),
-          _parent(parent),
-          _local_state(local_state),
-          _output_tuple_desc(output_row_descriptor
-                                     ? 
output_row_descriptor->tuple_descriptors().front()
-                                     : output_tuple_desc),
-          _output_row_descriptor(output_row_descriptor),
-          _process_status(Status::OK()),
-          _batch_size(state->batch_size()),
-          limit(limit_),
-          _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
-                              num_parallel_instances),
-          _scanner_scheduler(state->exec_env()->scanner_scheduler()),
-          _scanners(scanners.begin(), scanners.end()),
-          _all_scanners(scanners.begin(), scanners.end()),
-          _num_parallel_instances(num_parallel_instances) {
-    DCHECK(_output_row_descriptor == nullptr ||
-           _output_row_descriptor->tuple_descriptors().size() == 1);
-    _query_id = _state->get_query_ctx()->query_id();
-    ctx_id = UniqueId::gen_uid().to_string();
-    if (_scanners.empty()) {
-        _is_finished = true;
-        _set_scanner_done();
-    }
-    if (limit < 0) {
-        limit = -1;
-    }
-    _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
-    _max_thread_num *= num_parallel_instances;
-    _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
-    DCHECK(_max_thread_num > 0);
-    _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
-    // 1. Calculate max concurrency
-    // For select * from table limit 10; should just use one thread.
-    if ((_parent && _parent->should_run_serial()) ||
-        (_local_state && _local_state->should_run_serial())) {
-        _max_thread_num = 1;
-    }
+        : ScannerContext(state, output_tuple_desc, output_row_descriptor, 
scanners, limit_,
+                         max_bytes_in_blocks_queue, num_parallel_instances, 
local_state) {
+    _parent = parent;
 }
 
 // After init function call, should not access _parent
@@ -150,43 +105,21 @@ Status ScannerContext::init() {
     if (_parent) {
         _scanner_profile = _parent->_scanner_profile;
         _scanner_sched_counter = _parent->_scanner_sched_counter;
-        _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
-        _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
-        _free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
         _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
-        _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
         _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
+        _free_blocks_memory_usage_mark = _parent->_free_blocks_memory_usage;
+        _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
+        _scale_up_scanners_counter = _parent->_scale_up_scanners_counter;
     } else {
         _scanner_profile = _local_state->_scanner_profile;
         _scanner_sched_counter = _local_state->_scanner_sched_counter;
-        _scanner_ctx_sched_counter = _local_state->_scanner_ctx_sched_counter;
-        _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
-        _free_blocks_memory_usage = _local_state->_free_blocks_memory_usage;
         _newly_create_free_blocks_num = 
_local_state->_newly_create_free_blocks_num;
-        _queued_blocks_memory_usage = 
_local_state->_queued_blocks_memory_usage;
         _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
+        _free_blocks_memory_usage_mark = 
_local_state->_free_blocks_memory_usage;
+        _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
+        _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
     }
 
-    // 2. Calculate the number of free blocks that all scanners can use.
-    // The calculation logic is as follows:
-    //  1. Assuming that at most M rows can be scanned in one 
scan(config::doris_scanner_row_num),
-    //     then figure out how many blocks are required for one 
scan(_block_per_scanner).
-    //  2. The maximum number of concurrency * the blocks required for one 
scan,
-    //     that is, the number of blocks that all scanners can use.
-    auto doris_scanner_row_num =
-            limit == -1 ? config::doris_scanner_row_num
-                        : 
std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
-    int real_block_size =
-            limit == -1 ? _batch_size : 
std::min(static_cast<int64_t>(_batch_size), limit);
-    _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / 
real_block_size;
-    _free_blocks_capacity = _max_thread_num * _block_per_scanner;
-    auto block = get_free_block();
-    _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
-    int min_blocks = (config::min_bytes_in_scanner_queue + 
_estimated_block_bytes - 1) /
-                     _estimated_block_bytes;
-    _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
-    return_free_block(std::move(block));
-
 #ifndef BE_TEST
     // 3. get thread token
     if (_state->get_query_ctx()) {
@@ -198,8 +131,6 @@ Status ScannerContext::init() {
     }
 #endif
 
-    _num_unfinished_scanners = _scanners.size();
-
     if (_parent) {
         COUNTER_SET(_parent->_max_scanner_thread_num, 
(int64_t)_max_thread_num);
         _parent->_runtime_profile->add_info_string("UseSpecificThreadToken",
@@ -210,6 +141,17 @@ Status ScannerContext::init() {
                                                         thread_token == 
nullptr ? "False" : "True");
     }
 
+    // submit `_max_thread_num` running scanners to `ScannerScheduler`
+    // When a running scanners is finished, it will submit one of the 
remaining scanners.
+    for (int i = 0; i < _max_thread_num; ++i) {
+        std::weak_ptr<ScannerDelegate> next_scanner;
+        if (_scanners.try_dequeue(next_scanner)) {
+            vectorized::BlockUPtr block = get_free_block();
+            submit_scan_task(std::make_shared<ScanTask>(next_scanner, 
std::move(block)));
+            _num_running_scanners++;
+        }
+    }
+
     return Status::OK();
 }
 
@@ -220,138 +162,221 @@ std::string ScannerContext::parent_name() {
 vectorized::BlockUPtr ScannerContext::get_free_block() {
     vectorized::BlockUPtr block;
     if (_free_blocks.try_dequeue(block)) {
+        std::lock_guard<std::mutex> fl(_free_blocks_lock);
         DCHECK(block->mem_reuse());
-        _free_blocks_memory_usage->add(-block->allocated_bytes());
-        _serving_blocks_num++;
+        _free_blocks_memory_usage -= block->allocated_bytes();
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         return block;
     }
 
-    block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
-                                             true /*ignore invalid slots*/);
-
-    COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-
-    _serving_blocks_num++;
-    return block;
+    _newly_create_free_blocks_num->update(1);
+    return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
+                                            true /*ignore invalid slots*/);
 }
 
-void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> 
block) {
-    _serving_blocks_num--;
-    if (block->mem_reuse()) {
-        // Only put blocks with schema to free blocks, because colocate blocks
-        // need schema.
-        _estimated_block_bytes = std::max(block->allocated_bytes(), 
(size_t)16);
+void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
+    std::lock_guard<std::mutex> fl(_free_blocks_lock);
+    if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) 
{
         block->clear_column_data();
-        _free_blocks_memory_usage->add(block->allocated_bytes());
+        _free_blocks_memory_usage += block->allocated_bytes();
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         _free_blocks.enqueue(std::move(block));
     }
 }
 
-void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks) {
-    std::lock_guard l(_transfer_lock);
-    auto old_bytes_in_queue = _cur_bytes_in_queue;
-    for (auto& b : blocks) {
-        auto st = validate_block_schema(b.get());
+bool ScannerContext::empty_in_queue(int id) {
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    return _blocks_queue.empty();
+}
+
+void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
+    _scanner_sched_counter->update(1);
+    _num_scheduled_scanners++;
+    _scanner_scheduler->submit(shared_from_this(), scan_task);
+}
+
+void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> 
scan_task) {
+    if (scan_task->status_ok() && scan_task->current_block->rows() > 0) {
+        Status st = validate_block_schema(scan_task->current_block.get());
         if (!st.ok()) {
-            set_status_on_error(st, false);
+            scan_task->set_status(st);
         }
-        _cur_bytes_in_queue += b->allocated_bytes();
-        _blocks_queue.push_back(std::move(b));
     }
-    blocks.clear();
-    if (_dependency) {
-        _dependency->set_ready();
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    if (!scan_task->status_ok()) {
+        _process_status = scan_task->get_status();
+    }
+    if (_last_scale_up_time == 0) {
+        _last_scale_up_time = UnixMillis();
+    }
+    if (_blocks_queue.empty() && _last_fetch_time != 0) {
+        // there's no block in queue before current block, so the consumer is 
waiting
+        _total_wait_block_time += UnixMillis() - _last_fetch_time;
     }
+    _num_scheduled_scanners--;
+    _blocks_queue.emplace_back(scan_task);
     _blocks_queue_added_cv.notify_one();
-    _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
-    g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
 }
 
-bool ScannerContext::empty_in_queue(int id) {
+Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::Block* block,
+                                            bool* eos, int id, bool wait) {
+    if (state->is_cancelled()) {
+        _set_scanner_done();
+        return Status::Cancelled("Query cancelled in ScannerContext");
+    }
     std::unique_lock l(_transfer_lock);
-    return _blocks_queue.empty();
-}
+    // Wait for block from queue
+    if (wait) {
+        // scanner batch wait time
+        SCOPED_TIMER(_scanner_wait_batch_timer);
+        while (!done() && _blocks_queue.empty() && _process_status.ok()) {
+            _blocks_queue_added_cv.wait_for(l, 1s);
+        }
+    }
+    if (!_process_status.ok()) {
+        _set_scanner_done();
+        return _process_status;
+    }
+    std::shared_ptr<ScanTask> scan_task = nullptr;
+    if (!_blocks_queue.empty() && !done()) {
+        _last_fetch_time = UnixMillis();
+        scan_task = _blocks_queue.front();
+        _blocks_queue.pop_front();
+    }
 
-Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                            bool* eos, int id) {
-    std::vector<vectorized::BlockUPtr> merge_blocks;
-    {
-        std::unique_lock l(_transfer_lock);
-        // Normally, the scanner scheduler will schedule ctx.
-        // But when the amount of data in the blocks queue exceeds the upper 
limit,
-        // the scheduler will stop scheduling.
-        // (if the scheduler continues to schedule, it will cause a lot of 
busy running).
-        // At this point, consumers are required to trigger new scheduling to 
ensure that
-        // data can be continuously fetched.
-        bool to_be_schedule = should_be_scheduled();
-
-        bool is_scheduled = false;
-        if (!done() && to_be_schedule && _num_running_scanners == 0) {
-            is_scheduled = true;
-            auto submit_status = 
_scanner_scheduler->submit(shared_from_this());
-            if (!submit_status.ok()) {
-                set_status_on_error(submit_status, false);
+    if (scan_task) {
+        if (!scan_task->status_ok()) {
+            _set_scanner_done();
+            return scan_task->get_status();
+        }
+        // We can only know the block size after reading at least one block
+        // Just take the size of first block as `_estimated_block_size`
+        if (scan_task->first_block) {
+            std::lock_guard<std::mutex> fl(_free_blocks_lock);
+            size_t block_size = scan_task->current_block->allocated_bytes();
+            _free_blocks_memory_usage += block_size;
+            _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+            scan_task->first_block = false;
+            if (block_size > _estimated_block_size) {
+                _estimated_block_size = block_size;
             }
         }
-
-        // Wait for block from queue
-        {
-            SCOPED_TIMER(_scanner_wait_batch_timer);
-            // scanner batch wait time
-            while (!(!_blocks_queue.empty() || done() || !status().ok() || 
state->is_cancelled())) {
-                if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
-                    LOG(INFO) << debug_string();
+        // consume current block
+        block->swap(*scan_task->current_block);
+        if (!scan_task->current_block->mem_reuse()) {
+            // it depends on the memory strategy of ScanNode/ScanOperator
+            // we should double check `mem_reuse()` of `current_block` to make 
sure it can be reused
+            _newly_create_free_blocks_num->update(1);
+            scan_task->current_block = 
vectorized::Block::create_unique(_output_tuple_desc->slots(),
+                                                                        
_batch_size, true);
+        }
+        if (scan_task->is_eos()) { // current scanner is finished, and no more 
data to read
+            _num_finished_scanners++;
+            std::weak_ptr<ScannerDelegate> next_scanner;
+            // submit one of the remaining scanners
+            if (_scanners.try_dequeue(next_scanner)) {
+                // reuse current running scanner, just reset some states.
+                scan_task->reuse_scanner(next_scanner);
+                submit_scan_task(scan_task);
+            } else {
+                // no more scanner to be scheduled
+                // `_free_blocks` serve all running scanners, maybe it's too 
large for the remaining scanners
+                int free_blocks_for_each = _free_blocks.size_approx() / 
_num_running_scanners;
+                _num_running_scanners--;
+                std::lock_guard<std::mutex> fl(_free_blocks_lock);
+                for (int i = 0; i < free_blocks_for_each; ++i) {
+                    vectorized::BlockUPtr removed_block;
+                    if (_free_blocks.try_dequeue(removed_block)) {
+                        _free_blocks_memory_usage -= block->allocated_bytes();
+                        
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+                    }
                 }
-                _blocks_queue_added_cv.wait_for(l, 1s);
             }
+        } else {
+            // resubmit current running scanner to read the next block
+            submit_scan_task(scan_task);
         }
+        // scale up
+        _try_to_scale_up();
+    }
 
-        if (state->is_cancelled()) {
-            set_status_on_error(Status::Cancelled("cancelled"), false);
-        }
+    if (_num_finished_scanners == _all_scanners.size() && 
_blocks_queue.empty()) {
+        _set_scanner_done();
+        _is_finished = true;
+    }
+    *eos = done();
+    return Status::OK();
+}
 
-        if (!status().ok()) {
-            return status();
+void ScannerContext::_try_to_scale_up() {
+    // Four criteria to determine whether to increase the parallelism of the 
scanners
+    // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
+    // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get 
blocks
+    // 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough 
memory to scale up
+    // 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
+    if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 &&
+        (_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) &&
+        (_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && // 
duration > 5000ms
+        (_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) *
+                                          WAIT_BLOCK_DURATION_RATIO)) { // too 
large lock time
+        double wait_ratio =
+                (double)_total_wait_block_time / (_last_fetch_time - 
_last_scale_up_time);
+        if (_last_wait_duration_ratio > 0 && wait_ratio > 
_last_wait_duration_ratio * 0.8) {
+            // when _last_wait_duration_ratio > 0, it has scaled up before.
+            // we need to determine if the scale-up is effective:
+            // the wait duration ratio after last scaling up should less than 
80% of `_last_wait_duration_ratio`
+            return;
         }
 
-        if (!_blocks_queue.empty()) {
-            *block = std::move(_blocks_queue.front());
-            _blocks_queue.pop_front();
-            auto block_bytes = (*block)->allocated_bytes();
-            _cur_bytes_in_queue -= block_bytes;
-            _queued_blocks_memory_usage->add(-block_bytes);
-
-            auto rows = (*block)->rows();
-            while (!_blocks_queue.empty()) {
-                auto& add_block = _blocks_queue.front();
-                const auto add_rows = (*add_block).rows();
-                if (rows + add_rows < state->batch_size()) {
-                    rows += add_rows;
-                    block_bytes = (*add_block).allocated_bytes();
-                    _cur_bytes_in_queue -= block_bytes;
-                    _queued_blocks_memory_usage->add(-block_bytes);
-                    merge_blocks.emplace_back(std::move(add_block));
-                    _blocks_queue.pop_front();
+        std::lock_guard<std::mutex> fl(_free_blocks_lock);
+        bool is_scale_up = false;
+        // calculate the number of scanners that can be scheduled
+        int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO,
+                               _max_thread_num * MAX_SCALE_UP_RATIO - 
_num_running_scanners);
+        num_add = std::max(num_add, 1);
+        for (int i = 0; i < num_add; ++i) {
+            vectorized::BlockUPtr allocate_block = nullptr;
+            // reuse block in `_free_blocks` firstly
+            if (!_free_blocks.try_dequeue(allocate_block)) {
+                if (_free_blocks_memory_usage < _max_bytes_in_queue) {
+                    _newly_create_free_blocks_num->update(1);
+                    allocate_block = 
vectorized::Block::create_unique(_output_tuple_desc->slots(),
+                                                                      
_batch_size, true);
+                }
+            } else {
+                // comes from `_free_blocks`, decrease first, then will be 
added back.
+                _free_blocks_memory_usage -= allocate_block->allocated_bytes();
+                _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+            }
+            if (allocate_block) {
+                // get enough memory to launch one more scanner.
+                std::weak_ptr<ScannerDelegate> scale_up_scanner;
+                if (_scanners.try_dequeue(scale_up_scanner)) {
+                    std::shared_ptr<ScanTask> scale_up_task =
+                            std::make_shared<ScanTask>(scale_up_scanner, 
std::move(allocate_block));
+                    _free_blocks_memory_usage += _estimated_block_size;
+                    
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+                    // `first_block` is used to update 
`_free_blocks_memory_usage`,
+                    // we have got the `_estimated_block_size`, no need for 
further updates
+                    scale_up_task->first_block = false;
+                    submit_scan_task(scale_up_task);
+                    _num_running_scanners++;
+                    _scale_up_scanners_counter->update(1);
+                    is_scale_up = true;
                 } else {
                     break;
                 }
+            } else {
+                break;
             }
-        } else {
-            *eos = done();
         }
-    }
 
-    g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
-    if (!merge_blocks.empty()) {
-        vectorized::MutableBlock m(block->get());
-        for (auto& merge_block : merge_blocks) {
-            static_cast<void>(m.merge(*merge_block));
-            return_free_block(std::move(merge_block));
+        if (is_scale_up) {
+            _last_wait_duration_ratio = wait_ratio;
+            _last_scale_up_time = UnixMillis();
+            _total_wait_block_time = 0;
         }
-        (*block)->set_columns(std::move(m.mutable_columns()));
     }
-
-    return Status::OK();
 }
 
 Status ScannerContext::validate_block_schema(Block* block) {
@@ -380,29 +405,17 @@ Status ScannerContext::validate_block_schema(Block* 
block) {
     return Status::OK();
 }
 
-void ScannerContext::inc_num_running_scanners(int32_t inc) {
-    std::lock_guard l(_transfer_lock);
-    _num_running_scanners += inc;
-    g_num_running_scanners.set_value(_num_running_scanners);
-}
-
-void ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
-    std::unique_lock l(_transfer_lock, std::defer_lock);
-    if (need_lock) {
-        l.lock();
-    }
-    if (this->status().ok()) {
-        _process_status = status;
-        _blocks_queue_added_cv.notify_one();
-        _should_stop = true;
-        _set_scanner_done();
-        LOG(INFO) << "ctx is set status on error " << debug_string()
-                  << ", call stack is: " << Status::InternalError<true>("catch 
error status");
-    }
+void ScannerContext::set_status_on_error(const Status& status) {
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    _process_status = status;
+    _blocks_queue_added_cv.notify_one();
 }
 
 void ScannerContext::stop_scanners(RuntimeState* state) {
-    std::unique_lock l(_transfer_lock);
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    if (_should_stop) {
+        return;
+    }
     _should_stop = true;
     _set_scanner_done();
     for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
@@ -453,95 +466,15 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
     _blocks_queue_added_cv.notify_one();
 }
 
-void ScannerContext::_set_scanner_done() {
-    if (_dependency) {
-        _dependency->set_scanner_done();
-    }
-}
-
 std::string ScannerContext::debug_string() {
     return fmt::format(
-            "id: {}, total scanners: {}, scanners: {}, blocks in queue: {},"
-            " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
+            "id: {}, total scanners: {}, blocks in queue: {},"
+            " _should_stop: {}, _is_finished: {}, free blocks: {},"
             " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
-            " _block_per_scanner: {}, _cur_bytes_in_queue: {}, 
MAX_BYTE_OF_QUEUE: {}, "
-            "num_ctx_scheduled: {}, serving_blocks_num: {}, 
allowed_blocks_num: {}, query_id: {}",
-            ctx_id, _all_scanners.size(), _scanners.size(), 
_blocks_queue.size(),
-            _process_status.to_string(), _should_stop, _is_finished, 
_free_blocks.size_approx(),
-            limit, _num_running_scanners, _max_thread_num, _block_per_scanner, 
_cur_bytes_in_queue,
-            _max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num, 
allowed_blocks_num(),
-            print_id(_query_id));
-}
-
-void ScannerContext::reschedule_scanner_ctx() {
-    std::lock_guard l(_transfer_lock);
-    if (done()) {
-        return;
-    }
-    auto submit_status = _scanner_scheduler->submit(shared_from_this());
-    //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
-    if (!submit_status.ok()) {
-        set_status_on_error(submit_status, false);
-    }
-}
-
-void 
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
 scanner) {
-    std::lock_guard l(_transfer_lock);
-    // Use a transfer lock to avoid the scanner be scheduled concurrently. For 
example, that after
-    // calling "_scanners.push_front(scanner)", there may be other ctx in 
scheduler
-    // to schedule that scanner right away, and in that schedule run, the 
scanner may be marked as closed
-    // before we call the following if() block.
-    {
-        --_num_running_scanners;
-        g_num_running_scanners.set_value(_num_running_scanners);
-        if (scanner->_scanner->need_to_close()) {
-            --_num_unfinished_scanners;
-            if (_num_unfinished_scanners == 0) {
-                _is_finished = true;
-                _set_scanner_done();
-                _blocks_queue_added_cv.notify_one();
-                return;
-            }
-        } else {
-            _scanners.push_front(scanner);
-        }
-    }
-
-    if (should_be_scheduled()) {
-        auto submit_status = _scanner_scheduler->submit(shared_from_this());
-        if (!submit_status.ok()) {
-            set_status_on_error(submit_status, false);
-        }
-    }
-}
-
-// This method is called in scanner scheduler, and task context is hold
-void ScannerContext::get_next_batch_of_scanners(
-        std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
-    std::lock_guard l(_transfer_lock);
-    // Update the sched counter for profile
-    Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); 
}};
-    // 1. Calculate how many scanners should be scheduled at this run.
-    // If there are enough space in blocks queue,
-    // the scanner number depends on the _free_blocks numbers
-    int thread_slot_num = get_available_thread_slot_num();
-
-    // 2. get #thread_slot_num scanners from ctx->scanners
-    // and put them into "this_run".
-    for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
-        std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
-        std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
-        _scanners.pop_front();
-        if (scanner == nullptr) {
-            continue;
-        }
-        if (scanner->_scanner->need_to_close()) {
-            static_cast<void>(scanner->_scanner->close(_state));
-        } else {
-            current_run->push_back(scanner_ref);
-            i++;
-        }
-    }
+            " _max_bytes_in_queue: {}, query_id: {}",
+            ctx_id, _all_scanners.size(), _blocks_queue.size(), _should_stop, 
_is_finished,
+            _free_blocks.size_approx(), limit, _num_scheduled_scanners, 
_max_thread_num,
+            _max_bytes_in_queue, print_id(_query_id));
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 4d936d72a13..58e6f4dae7f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -58,6 +58,47 @@ class VScanNode;
 class ScannerScheduler;
 class SimplifiedScanScheduler;
 
+class ScanTask {
+public:
+    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, 
vectorized::BlockUPtr free_block)
+            : scanner(delegate_scanner), current_block(std::move(free_block)) 
{}
+
+private:
+    // whether current scanner is finished
+    bool eos = false;
+    Status status = Status::OK();
+
+public:
+    std::weak_ptr<ScannerDelegate> scanner;
+    // cache the block of current loop
+    vectorized::BlockUPtr current_block;
+    // only take the size of the first block as estimated size
+    bool first_block = true;
+    uint64_t last_submit_time; // nanoseconds
+
+    void set_status(Status _status) {
+        if (_status.is<ErrorCode::END_OF_FILE>()) {
+            // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
+            eos = true;
+        }
+        status = _status;
+    }
+    Status get_status() const { return status; }
+    bool status_ok() { return status.ok() || 
status.is<ErrorCode::END_OF_FILE>(); }
+    bool is_eos() const { return eos; }
+    void set_eos(bool _eos) { eos = _eos; }
+
+    // reuse current running scanner
+    // reset `eos` and `status`
+    // `first_block` is used to update `_free_blocks_memory_usage`, and take 
the first block size
+    // as the `_estimated_block_size`. It has updated 
`_free_blocks_memory_usage`, so don't reset.
+    void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) {
+        scanner = next_scanner;
+        eos = false;
+        status = Status::OK();
+    }
+};
+
 // ScannerContext is responsible for recording the execution status
 // of a group of Scanners corresponding to a ScanNode.
 // Including how many scanners are being scheduled, and maintaining
@@ -81,88 +122,49 @@ public:
     virtual Status init();
 
     vectorized::BlockUPtr get_free_block();
-    void return_free_block(std::unique_ptr<vectorized::Block> block);
+    void return_free_block(vectorized::BlockUPtr block);
 
-    // Append blocks from scanners to the blocks queue.
-    virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks);
-    // Get next block from blocks queue. Called by ScanNode
+    // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
-    // And if eos is true, the block returned must be nullptr.
-    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                        bool* eos, int id);
+    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                        int id, bool wait = true);
 
     [[nodiscard]] Status validate_block_schema(Block* block);
 
-    // When a scanner complete a scan, this method will be called
-    // to return the scanner to the list for next scheduling.
-    void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> 
scanner);
+    // submit the running scanner to thread pool in `ScannerScheduler`
+    // set the next scanned block to `ScanTask::current_block`
+    // set the error state to `ScanTask::status`
+    // set the `eos` to `ScanTask::eos` if there is no more data in current 
scanner
+    void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
 
-    void set_status_on_error(const Status& status, bool need_lock = true);
+    // append the running scanner and its cached block to `_blocks_queue`
+    virtual void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
 
-    Status status() {
-        if (_process_status.is<ErrorCode::END_OF_FILE>()) {
-            return Status::OK();
-        }
-        return _process_status;
-    }
+    void set_status_on_error(const Status& status);
 
     // Return true if this ScannerContext need no more process
     bool done() const { return _is_finished || _should_stop; }
     bool is_finished() { return _is_finished.load(); }
     bool should_stop() { return _should_stop.load(); }
 
-    void inc_num_running_scanners(int32_t scanner_inc);
-
-    int get_num_running_scanners() const { return _num_running_scanners; }
-
-    int get_num_unfinished_scanners() const { return _num_unfinished_scanners; 
}
-
-    void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>* 
current_run);
-
     virtual std::string debug_string();
 
     RuntimeState* state() { return _state; }
-
-    void incr_num_ctx_scheduling(int64_t num) { 
_scanner_ctx_sched_counter->update(num); }
-
-    int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); }
     void incr_ctx_scheduling_time(int64_t num) { 
_scanner_ctx_sched_time->update(num); }
 
     std::string parent_name();
 
     virtual bool empty_in_queue(int id);
 
-    // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
-    inline bool should_be_scheduled() const {
-        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
-               (_serving_blocks_num < allowed_blocks_num());
-    }
-
-    int get_available_thread_slot_num() {
-        int thread_slot_num = 0;
-        thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / 
_block_per_scanner;
-        thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
-        if (thread_slot_num <= 0) {
-            thread_slot_num = 1;
-        }
-        return thread_slot_num;
-    }
-
-    int32_t allowed_blocks_num() const {
-        int32_t blocks_num = std::min(_free_blocks_capacity,
-                                      int32_t((_max_bytes_in_queue + 
_estimated_block_bytes - 1) /
-                                              _estimated_block_bytes));
-        return blocks_num;
-    }
-
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
-    virtual void reschedule_scanner_ctx();
     void stop_scanners(RuntimeState* state);
 
     int32_t get_max_thread_num() const { return _max_thread_num; }
     void set_max_thread_num(int32_t num) { _max_thread_num = num; }
 
+    int batch_size() const { return _batch_size; }
+
     // the unique id of this context
     std::string ctx_id;
     TUniqueId _query_id;
@@ -176,10 +178,16 @@ protected:
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners_, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
-                   pipeline::ScanLocalStateBase* local_state,
-                   std::shared_ptr<pipeline::ScanDependency> dependency);
+                   pipeline::ScanLocalStateBase* local_state);
+
+    /// Four criteria to determine whether to increase the parallelism of the 
scanners
+    /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
+    /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get 
blocks
+    /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough 
memory to scale up
+    /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
+    virtual void _set_scanner_done() {};
 
-    void _set_scanner_done();
+    void _try_to_scale_up();
 
     RuntimeState* _state = nullptr;
     VScanNode* _parent = nullptr;
@@ -189,97 +197,52 @@ protected:
     const TupleDescriptor* _output_tuple_desc = nullptr;
     const RowDescriptor* _output_row_descriptor = nullptr;
 
-    // _transfer_lock is used to protect the critical section
-    // where the ScanNode and ScannerScheduler interact.
-    // Including access to variables such as blocks_queue, _process_status, 
_is_finished, etc.
     std::mutex _transfer_lock;
-    // The blocks got from scanners will be added to the "blocks_queue".
-    // And the upper scan node will be as a consumer to fetch blocks from this 
queue.
-    // Should be protected by "_transfer_lock"
-    std::list<vectorized::BlockUPtr> _blocks_queue;
-    // Wait in get_block_from_queue(), by ScanNode.
     std::condition_variable _blocks_queue_added_cv;
-    // Wait in clear_and_join(), by ScanNode.
-    std::condition_variable _ctx_finish_cv;
-
-    // The following 3 variables control the process of the scanner scheduling.
-    // Use _transfer_lock to protect them.
-    // 1. _process_status
-    //      indicates the global status of this scanner context.
-    //      Set to non-ok if encounter errors.
-    //      And if it is non-ok, the scanner process should stop.
-    //      Set be set by either ScanNode or ScannerScheduler.
-    // 2. _should_stop
-    //      Always be set by ScanNode.
-    //      True means no more data need to be read(reach limit or closed)
-    // 3. _is_finished
-    //      Always be set by ScannerScheduler.
-    //      True means all scanners are finished to scan.
-    Status _process_status;
+    std::list<std::shared_ptr<ScanTask>> _blocks_queue;
+
+    Status _process_status = Status::OK();
     std::atomic_bool _should_stop = false;
     std::atomic_bool _is_finished = false;
 
     // Lazy-allocated blocks for all scanners to share, for memory reuse.
     moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
-    std::atomic<int32_t> _serving_blocks_num = 0;
-    // The current number of free blocks available to the scanners.
-    // Used to limit the memory usage of the scanner.
-    // NOTE: this is NOT the size of `_free_blocks`.
-    int32_t _free_blocks_capacity = 0;
-    int64_t _estimated_block_bytes = 0;
 
     int _batch_size;
     // The limit from SQL's limit clause
     int64_t limit;
 
-    // Current number of running scanners.
-    std::atomic_int32_t _num_running_scanners = 0;
-    // Current number of ctx being scheduled.
-    // After each Scanner finishes a task, it will put the corresponding ctx
-    // back into the scheduling queue.
-    // Therefore, there will be multiple pointer of same ctx in the scheduling 
queue.
-    // Here we record the number of ctx in the scheduling  queue to clean up 
at the end.
-    std::atomic_int32_t _num_scheduling_ctx = 0;
-    // Num of unfinished scanners. Should be set in init()
-    std::atomic_int32_t _num_unfinished_scanners = 0;
-    // Max number of scan thread for this scanner context.
     int32_t _max_thread_num = 0;
-    // How many blocks a scanner can use in one task.
-    int32_t _block_per_scanner = 0;
-
-    // The current bytes of blocks in blocks queue
-    int64_t _cur_bytes_in_queue = 0;
-    // The max limit bytes of blocks in blocks queue
-    const int64_t _max_bytes_in_queue;
-
+    int64_t _max_bytes_in_queue;
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
     SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
-    // List "scanners" saves all "unfinished" scanners.
-    // The scanner scheduler will pop scanners from this list, run scanner,
-    // and then if the scanner is not finished, will be pushed back to this 
list.
-    // Not need to protect by lock, because only one scheduler thread will 
access to it.
-    std::mutex _scanners_lock;
-    // Scanner's ownership belong to vscannode or scanoperator, scanner 
context does not own it.
-    // ScannerContext has to check if scanner is deconstructed before use it.
-    std::list<std::weak_ptr<ScannerDelegate>> _scanners;
+    moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
+    int32_t _num_scheduled_scanners = 0;
+    int32_t _num_finished_scanners = 0;
+    int32_t _num_running_scanners = 0;
     // weak pointer for _scanners, used in stop function
     std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
-    std::vector<int64_t> _finished_scanner_runtime;
-    std::vector<int64_t> _finished_scanner_rows_read;
-    std::vector<int64_t> _finished_scanner_wait_worker_time;
-
     const int _num_parallel_instances;
-
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
-    RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
-    RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = 
nullptr;
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
-
-    std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark = 
nullptr;
+    RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
+    RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
+
+    // for scaling up the running scanners
+    std::mutex _free_blocks_lock;
+    size_t _estimated_block_size = 0;
+    int64_t _free_blocks_memory_usage = 0;
+    int64_t _last_scale_up_time = 0;
+    int64_t _last_fetch_time = 0;
+    int64_t _total_wait_block_time = 0;
+    double _last_wait_duration_ratio = 0;
+    const int64_t SCALE_UP_DURATION = 5000; // 5000ms
+    const float WAIT_BLOCK_DURATION_RATIO = 0.5;
+    const float SCALE_UP_RATIO = 0.5;
+    float MAX_SCALE_UP_RATIO;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d8678bc0dc3..40fff7ed70c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -69,10 +69,6 @@ ScannerScheduler::~ScannerScheduler() {
         return;
     }
 
-    for (int i = 0; i < QUEUE_NUM; i++) {
-        delete _pending_queues[i];
-    }
-    delete[] _pending_queues;
     _deregister_metrics();
 }
 
@@ -81,18 +77,12 @@ void ScannerScheduler::stop() {
         return;
     }
 
-    for (int i = 0; i < QUEUE_NUM; i++) {
-        _pending_queues[i]->shutdown();
-    }
-
     _is_closed = true;
 
-    _scheduler_pool->shutdown();
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
     _limited_scan_thread_pool->shutdown();
 
-    _scheduler_pool->wait();
     _local_scan_thread_pool->join();
     _remote_scan_thread_pool->join();
     _limited_scan_thread_pool->wait();
@@ -101,24 +91,12 @@ void ScannerScheduler::stop() {
 }
 
 Status ScannerScheduler::init(ExecEnv* env) {
-    // 1. scheduling thread pool and scheduling queues
-    static_cast<void>(ThreadPoolBuilder("SchedulingThreadPool")
-                              .set_min_threads(QUEUE_NUM)
-                              .set_max_threads(QUEUE_NUM)
-                              .build(&_scheduler_pool));
-
-    _pending_queues = new 
BlockingQueue<std::shared_ptr<ScannerContext>>*[QUEUE_NUM];
-    for (int i = 0; i < QUEUE_NUM; i++) {
-        _pending_queues[i] = new 
BlockingQueue<std::shared_ptr<ScannerContext>>(INT32_MAX);
-        static_cast<void>(_scheduler_pool->submit_func([this, i] { 
this->_schedule_thread(i); }));
-    }
-
-    // 2. local scan thread pool
+    // 1. local scan thread pool
     _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
             config::doris_scanner_thread_pool_thread_num,
             config::doris_scanner_thread_pool_queue_size, "local_scan");
 
-    // 3. remote scan thread pool
+    // 2. remote scan thread pool
     _remote_thread_pool_max_size = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
                                            ? 
config::doris_max_remote_scanner_thread_pool_thread_num
                                            : std::max(512, 
CpuInfo::num_cores() * 10);
@@ -128,7 +106,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
             _remote_thread_pool_max_size, 
config::doris_remote_scanner_thread_pool_queue_size,
             "RemoteScanThreadPool");
 
-    // 4. limited scan thread pool
+    // 3. limited scan thread pool
     static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
                               
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
                               
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
@@ -139,155 +117,97 @@ Status ScannerScheduler::init(ExecEnv* env) {
     return Status::OK();
 }
 
-Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
+void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
+                              std::shared_ptr<ScanTask> scan_task) {
+    scan_task->last_submit_time = GetCurrentTimeNanos();
     if (ctx->done()) {
-        return Status::EndOfFile("ScannerContext is done");
-    }
-    ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
-    if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
-        return Status::InternalError("failed to submit scanner context to 
scheduler");
-    }
-    return Status::OK();
-}
-
-std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
-        ThreadPool::ExecutionMode mode, int max_concurrency) {
-    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
-}
-
-void ScannerScheduler::_schedule_thread(int queue_id) {
-    BlockingQueue<std::shared_ptr<ScannerContext>>* queue = 
_pending_queues[queue_id];
-    while (!_is_closed) {
-        std::shared_ptr<ScannerContext> ctx;
-        bool ok = queue->blocking_get(&ctx);
-        if (!ok) {
-            // maybe closed
-            continue;
-        }
-
-        _schedule_scanners(ctx);
-        // If ctx is done, no need to schedule it again.
-        // But should notice that there may still scanners running in scanner 
pool.
+        return;
     }
-}
-
-void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) 
{
     auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
         LOG(INFO) << "could not lock task execution context, query " << 
ctx->debug_string()
                   << " maybe finished";
         return;
     }
-    MonotonicStopWatch watch;
-    watch.reset();
-    watch.start();
-    ctx->incr_num_ctx_scheduling(1);
-
-    if (ctx->done()) {
-        return;
-    }
-
-    std::list<std::weak_ptr<ScannerDelegate>> this_run;
-    ctx->get_next_batch_of_scanners(&this_run);
-    if (this_run.empty()) {
-        // There will be 2 cases when this_run is empty:
-        // 1. The blocks queue reaches limit.
-        //      The consumer will continue scheduling the ctx.
-        // 2. All scanners are running.
-        //      There running scanner will schedule the ctx after they are 
finished.
-        // So here we just return to stop scheduling ctx.
-        return;
-    }
-
-    ctx->inc_num_running_scanners(this_run.size());
 
     // Submit scanners to thread pool
     // TODO(cmy): How to handle this "nice"?
     int nice = 1;
-    auto iter = this_run.begin();
     if (ctx->thread_token != nullptr) {
-        // TODO llj tg how to treat this?
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            auto s = ctx->thread_token->submit_func([this, scanner_ref = 
*iter, ctx]() {
-                this->_scanner_scan(this, ctx, scanner_ref);
-            });
-            if (s.ok()) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(s);
-                break;
-            }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        auto s = ctx->thread_token->submit_func(
+                [this, scanner_ref = scan_task, ctx]() { 
this->_scanner_scan(ctx, scanner_ref); });
+        if (!s.ok()) {
+            scan_task->set_status(s);
+            ctx->append_block_to_queue(scan_task);
+            return;
         }
     } else {
-        while (iter != this_run.end()) {
-            std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
-            if (scanner_delegate == nullptr) {
-                // Has to ++, or there is a dead loop
-                iter++;
-                continue;
-            }
-            scanner_delegate->_scanner->start_wait_worker_timer();
-            TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
-            bool ret = false;
-            if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-                if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
-                    auto work_func = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    SimplifiedScanTask simple_scan_task = {work_func, ctx};
-                    ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
-                } else {
-                    PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner_ref = *iter, ctx]() {
-                        this->_scanner_scan(this, ctx, scanner_ref);
-                    };
-                    task.priority = nice;
-                    ret = _local_scan_thread_pool->offer(task);
-                }
+        std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
+        if (scanner_delegate == nullptr) {
+            return;
+        }
+
+        scanner_delegate->_scanner->start_wait_worker_timer();
+        TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
+        bool ret = false;
+        if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+            if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+                auto work_func = [this, scanner_ref = scan_task, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
+                };
+                SimplifiedScanTask simple_scan_task = {work_func, ctx};
+                ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
             } else {
                 PriorityThreadPool::Task task;
-                task.work_function = [this, scanner_ref = *iter, ctx]() {
-                    this->_scanner_scan(this, ctx, scanner_ref);
+                task.work_function = [this, scanner_ref = scan_task, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
                 };
                 task.priority = nice;
-                ret = _remote_scan_thread_pool->offer(task);
-            }
-            if (ret) {
-                iter++;
-            } else {
-                ctx->set_status_on_error(
-                        Status::InternalError("failed to submit scanner to 
scanner pool"));
-                break;
+                ret = _local_scan_thread_pool->offer(task);
             }
+        } else {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner_ref = scan_task, ctx]() {
+                this->_scanner_scan(ctx, scanner_ref);
+            };
+            task.priority = nice;
+            ret = _remote_scan_thread_pool->offer(task);
+        }
+        if (!ret) {
+            scan_task->set_status(
+                    Status::InternalError("Failed to submit scanner to scanner 
pool"));
+            ctx->append_block_to_queue(scan_task);
+            return;
         }
     }
-    ctx->incr_ctx_scheduling_time(watch.elapsed_time());
 }
 
-void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
-                                     std::shared_ptr<ScannerContext> ctx,
-                                     std::weak_ptr<ScannerDelegate> 
scanner_ref) {
+std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
+        ThreadPool::ExecutionMode mode, int max_concurrency) {
+    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
+}
+
+void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
+                                     std::shared_ptr<ScanTask> scan_task) {
+    // record the time from scanner submission to actual execution in 
nanoseconds
+    ctx->incr_ctx_scheduling_time(GetCurrentTimeNanos() - 
scan_task->last_submit_time);
     auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
-        // LOG(WARNING) << "could not lock task execution context, query " << 
print_id(_query_id)
-        //             << " maybe finished";
         return;
     }
-    //LOG_EVERY_N(INFO, 100) << "start running scanner from ctx " << 
ctx->debug_string();
-    // will release scanner if it is the last one, task lock is hold here, to 
ensure
-    // that scanner could call scannode's method during deconstructor
-    std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
-    auto& scanner = scanner_delegate->_scanner;
+
+    std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
     if (scanner_delegate == nullptr) {
         return;
     }
+
+    VScannerSPtr& scanner = scanner_delegate->_scanner;
     SCOPED_ATTACH_TASK(scanner->runtime_state());
     // for cpu hard limit, thread name should not be reset
     if (ctx->_should_reset_thread_name) {
@@ -310,14 +230,13 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     if (!scanner->is_init()) {
         status = scanner->init();
         if (!status.ok()) {
-            ctx->set_status_on_error(status);
             eos = true;
         }
     }
+
     if (!eos && !scanner->is_open()) {
         status = scanner->open(state);
         if (!status.ok()) {
-            ctx->set_status_on_error(status);
             eos = true;
         }
         scanner->set_opened();
@@ -325,46 +244,21 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
 
     static_cast<void>(scanner->try_append_late_arrival_runtime_filter());
 
-    // Because we use thread pool to scan data from storage. One scanner can't
-    // use this thread too long, this can starve other query's scanner. So, we
-    // need yield this thread when we do enough work. However, OlapStorage read
-    // data in pre-aggregate mode, then we can't use storage returned data to
-    // judge if we need to yield. So we record all raw data read in this round
-    // scan, if this exceeds row number or bytes threshold, we yield this 
thread.
-    std::vector<vectorized::BlockUPtr> blocks;
-    int64_t raw_bytes_read = 0;
-    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-    int num_rows_in_block = 0;
-
-    // Only set to true when ctx->done() return true.
-    // Use this flag because we need distinguish eos from `should_stop`.
-    // If eos is true, we still need to return blocks,
-    // but is should_stop is true, no need to return blocks
-    bool should_stop = false;
-    // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
-    // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    auto should_do_scan = [&, batch_size = state->batch_size(),
-                           time = state->wait_full_block_schedule_times()]() {
-        if (raw_bytes_read < raw_bytes_threshold) {
-            return true;
-        } else if (num_rows_in_block < batch_size) {
-            return raw_bytes_read < raw_bytes_threshold * time;
-        }
-        return false;
-    };
-
-    while (!eos && should_do_scan()) {
-        // TODO llj task group should should_yield?
+    bool first_read = true;
+    while (!eos) {
         if (UNLIKELY(ctx->done())) {
-            // No need to set status on error here.
-            // Because done() maybe caused by "should_stop"
-            should_stop = true;
+            eos = true;
             break;
         }
+        BlockUPtr free_block = nullptr;
+        if (first_read) {
+            status = scanner->get_block_after_projects(state, 
scan_task->current_block.get(), &eos);
+            first_read = false;
+        } else {
+            free_block = ctx->get_free_block();
+            status = scanner->get_block_after_projects(state, 
free_block.get(), &eos);
+        }
 
-        BlockUPtr block = ctx->get_free_block();
-
-        status = scanner->get_block_after_projects(state, block.get(), &eos);
         // The VFileScanner for external table may try to open not exist files,
         // Because FE file cache for external table may out of date.
         // So, NOT_FOUND for VFileScanner is not a fail case.
@@ -374,49 +268,36 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
                               !status.is<ErrorCode::NOT_FOUND>()))) {
             LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
             break;
-        }
-        VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << 
eos;
-        if (status.is<ErrorCode::NOT_FOUND>()) {
+        } else if (status.is<ErrorCode::NOT_FOUND>()) {
             // The only case in this "if" branch is external table file delete 
and fe cache has not been updated yet.
             // Set status to OK.
             status = Status::OK();
             eos = true;
+            break;
         }
 
-        raw_bytes_read += block->allocated_bytes();
-        num_rows_in_block += block->rows();
-        if (UNLIKELY(block->rows() == 0)) {
-            ctx->return_free_block(std::move(block));
-        } else {
-            if (!blocks.empty() && blocks.back()->rows() + block->rows() <= 
state->batch_size()) {
-                vectorized::MutableBlock mutable_block(blocks.back().get());
-                static_cast<void>(mutable_block.merge(*block));
-                
blocks.back().get()->set_columns(std::move(mutable_block.mutable_columns()));
-                ctx->return_free_block(std::move(block));
-            } else {
-                blocks.push_back(std::move(block));
-            }
+        if (!first_read && free_block) {
+            vectorized::MutableBlock 
mutable_block(scan_task->current_block.get());
+            static_cast<void>(mutable_block.merge(*free_block));
+            
scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns()));
+            ctx->return_free_block(std::move(free_block));
+        }
+        if (scan_task->current_block->rows() >= ctx->batch_size()) {
+            break;
         }
     } // end for while
 
-    // if we failed, check status.
     if (UNLIKELY(!status.ok())) {
-        // _transfer_done = true;
-        ctx->set_status_on_error(status);
+        scan_task->set_status(status);
         eos = true;
-        blocks.clear();
-    } else if (should_stop) {
-        // No need to return blocks because of should_stop, just delete them
-        blocks.clear();
-    } else if (!blocks.empty()) {
-        ctx->append_blocks_to_queue(blocks);
     }
 
     scanner->update_scan_cpu_timer();
-    if (eos || should_stop) {
+    if (eos) {
         scanner->mark_to_need_to_close();
     }
-    ctx->push_back_scanner_and_reschedule(scanner_delegate);
+    scan_task->set_eos(eos);
+    ctx->append_block_to_queue(scan_task);
 }
 
 void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 9fedd27dbd8..7a602038956 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -37,25 +37,18 @@ class BlockingQueue;
 
 namespace doris::vectorized {
 class ScannerDelegate;
+class ScanTask;
 class ScannerContext;
 
 // Responsible for the scheduling and execution of all Scanners of a BE node.
-// ScannerScheduler has two types of thread pools:
-// 1. Scheduling thread pool
-//     Responsible for Scanner scheduling.
-//     A set of Scanners for a query will be encapsulated into a ScannerContext
-//     and submitted to the ScannerScheduler's scheduling queue.
-//     There are multiple scheduling queues in ScannerScheduler, and each 
scheduling queue
-//     is handled by a scheduling thread.
-//     The scheduling thread is scheduled in granularity of ScannerContext,
-//     that is, a group of Scanners in a ScannerContext are scheduled at a 
time.
-//
-//2. Execution thread pool
-//     The scheduling thread will submit the Scanners selected from the 
ScannerContext
+// Execution thread pool
+//     When a ScannerContext is launched, it will submit the running scanners 
to this scheduler.
+//     The scheduling thread will submit the running scanner and its 
ScannerContext
 //     to the execution thread pool to do the actual scan task.
-//     Each Scanner will act as a producer, read a group of blocks and put 
them into
+//     Each Scanner will act as a producer, read the next block and put it into
 //     the corresponding block queue.
 //     The corresponding ScanNode will act as a consumer to consume blocks 
from the block queue.
+//     After the block is consumed, the unfinished scanner will resubmit to 
this scheduler.
 class ScannerScheduler {
 public:
     ScannerScheduler();
@@ -63,7 +56,7 @@ public:
 
     [[nodiscard]] Status init(ExecEnv* env);
 
-    [[nodiscard]] Status submit(std::shared_ptr<ScannerContext> ctx);
+    void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> 
scan_task);
 
     void stop();
 
@@ -73,32 +66,13 @@ public:
     int remote_thread_pool_max_size() const { return 
_remote_thread_pool_max_size; }
 
 private:
-    // scheduling thread function
-    void _schedule_thread(int queue_id);
-    // schedule scanners in a certain ScannerContext
-    void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
-    // execution thread function
-    void _scanner_scan(ScannerScheduler* scheduler, 
std::shared_ptr<ScannerContext> ctx,
-                       std::weak_ptr<ScannerDelegate> scanner);
+    static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
+                              std::shared_ptr<ScanTask> scan_task);
 
     void _register_metrics();
 
     static void _deregister_metrics();
 
-    // Scheduling queue number.
-    // TODO: make it configurable.
-    static const int QUEUE_NUM = 4;
-    // The ScannerContext will be submitted to the pending queue roundrobin.
-    // _queue_idx pointer to the current queue.
-    // Use std::atomic_uint to prevent numerical overflow from memory out of 
bound.
-    // The scheduler thread will take ctx from pending queue, schedule it,
-    // and put it to the _scheduling_map.
-    // If any scanner finish, it will take ctx from and put it to pending 
queue again.
-    std::atomic_uint _queue_idx = {0};
-    BlockingQueue<std::shared_ptr<ScannerContext>>** _pending_queues = nullptr;
-
-    // scheduling thread pool
-    std::unique_ptr<ThreadPool> _scheduler_pool;
     // execution thread pool
     // _local_scan_thread_pool is for local scan task(typically, olap scanner)
     // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, 
etc.)
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 568b206db5e..f2f4e242bc6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -197,7 +197,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
             if (_scanner_ctx) {
                 DCHECK(!_eos && _num_scanners->value() > 0);
                 RETURN_IF_ERROR(_scanner_ctx->init());
-                
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
             }
             if (_shared_scan_opt) {
                 LOG(INFO) << "instance shared scan enabled"
@@ -219,7 +218,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
                               : Status::OK());
         if (_scanner_ctx) {
             RETURN_IF_ERROR(_scanner_ctx->init());
-            
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
         }
     }
 
@@ -246,14 +244,10 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     }};
 
     if (state->is_cancelled()) {
-        // ISSUE: https://github.com/apache/doris/issues/16360
-        // _scanner_ctx may be null here, see: `VScanNode::alloc_resource` 
(_eos == null)
         if (_scanner_ctx) {
-            _scanner_ctx->set_status_on_error(Status::Cancelled("query 
cancelled"));
-            return _scanner_ctx->status();
-        } else {
-            return Status::Cancelled("query cancelled");
+            _scanner_ctx->stop_scanners(state);
         }
+        return Status::Cancelled("Query cancelled in ScanNode");
     }
 
     if (_eos) {
@@ -261,16 +255,7 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
         return Status::OK();
     }
 
-    vectorized::BlockUPtr scan_block = nullptr;
-    RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, 
eos, _context_queue_id));
-    if (*eos) {
-        DCHECK(scan_block == nullptr);
-        return Status::OK();
-    }
-
-    // get scanner's block memory
-    block->swap(*scan_block);
-    _scanner_ctx->return_free_block(std::move(scan_block));
+    RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, block, eos, 
_context_queue_id));
 
     reached_limit(block, eos);
     if (*eos) {
@@ -294,16 +279,14 @@ Status VScanNode::_init_profile() {
     runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
 
     _memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
-    _queued_blocks_memory_usage =
-            _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", 
TUnit::BYTES, "MemoryUsage");
     _free_blocks_memory_usage =
             _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", 
TUnit::BYTES, "MemoryUsage");
     _newly_create_free_blocks_num =
             ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
+    _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, 
"NumScaleUpScanners", TUnit::UNIT);
     // time of transfer thread to wait for block from scan thread
     _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, 
"ScannerBatchWaitTime");
     _scanner_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerSchedCount", TUnit::UNIT);
-    _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerCtxSchedCount", TUnit::UNIT);
     _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, 
"ScannerCtxSchedTime");
 
     _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 0c39d15b57f..b83e9211214 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -377,7 +377,6 @@ protected:
     RuntimeProfile::Counter* _filter_timer = nullptr;
 
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
-    RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
     RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
     RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@@ -387,8 +386,8 @@ protected:
     RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
 
     RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = 
nullptr;
     RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+    RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
 
     std::unordered_map<std::string, int> _colname_to_slot_id;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8b5e24a52aa..8effd6adae2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -74,6 +74,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
     public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
+    public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
+    public static final String SCANNER_SCALE_UP_RATIO = 
"scanner_scale_up_ratio";
     public static final String QUERY_TIMEOUT = "query_timeout";
     public static final String ANALYZE_TIMEOUT = "analyze_timeout";
 
@@ -553,6 +555,20 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
     public long maxScanQueueMemByte = 2147483648L / 20;
 
+    @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, 
description = {
+            "ScanNode扫描数据的最大并发,默认为0,采用BE的doris_scanner_thread_pool_thread_num",
+            "The max threads to read data of ScanNode, "
+                    + "default 0, use doris_scanner_thread_pool_thread_num in 
be.conf"
+    })
+    public int numScannerThreads = 0;
+
+    @VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, 
description = {
+            "ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
+            "The max multiple of increasing the concurrency of scanners 
adaptively, "
+                    + "default 0, turn off scaling up"
+    })
+    public double scannerScaleUpRatio = 0;
+
     @VariableMgr.VarAttr(name = ENABLE_SPILLING)
     public boolean enableSpilling = false;
 
@@ -1790,6 +1806,14 @@ public class SessionVariable implements Serializable, 
Writable {
         return maxScanQueueMemByte;
     }
 
+    public int getNumScannerThreads() {
+        return numScannerThreads;
+    }
+
+    public double getScannerScaleUpRatio() {
+        return scannerScaleUpRatio;
+    }
+
     public int getQueryTimeoutS() {
         return queryTimeoutS;
     }
@@ -1962,7 +1986,15 @@ public class SessionVariable implements Serializable, 
Writable {
     }
 
     public void setMaxScanQueueMemByte(long scanQueueMemByte) {
-        this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 
2);
+        this.maxScanQueueMemByte = scanQueueMemByte;
+    }
+
+    public void setNumScannerThreads(int numScannerThreads) {
+        this.numScannerThreads = numScannerThreads;
+    }
+
+    public void setScannerScaleUpRatio(double scannerScaleUpRatio) {
+        this.scannerScaleUpRatio = scannerScaleUpRatio;
     }
 
     public boolean isSqlQuoteShowCreate() {
@@ -2771,7 +2803,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public TQueryOptions toThrift() {
         TQueryOptions tResult = new TQueryOptions();
         tResult.setMemLimit(maxExecMemByte);
-        tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, 
maxExecMemByte / 20));
+        tResult.setScanQueueMemLimit(maxScanQueueMemByte);
+        tResult.setNumScannerThreads(numScannerThreads);
+        tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
 
         // TODO chenhao, reservation will be calculated by cost
         tResult.setMinReservation(0);
@@ -3067,7 +3101,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public TQueryOptions getQueryOptionVariables() {
         TQueryOptions queryOptions = new TQueryOptions();
         queryOptions.setMemLimit(maxExecMemByte);
-        queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, 
maxExecMemByte / 20));
+        queryOptions.setScanQueueMemLimit(maxScanQueueMemByte);
+        queryOptions.setNumScannerThreads(numScannerThreads);
+        queryOptions.setScannerScaleUpRatio(scannerScaleUpRatio);
         queryOptions.setQueryTimeout(queryTimeoutS);
         queryOptions.setInsertTimeout(insertTimeoutS);
         queryOptions.setAnalyzeTimeout(analyzeTimeoutS);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b0e3a50628b..c321aa2660e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -271,6 +271,8 @@ struct TQueryOptions {
   97: optional i64 parallel_scan_min_rows_per_scanner = 0;
 
   98: optional bool skip_bad_tablet = false;
+  // Increase concurrency of scanners adaptively, the maxinum times to scale up
+  99: optional double scanner_scale_up_ratio = 0;
 
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to