This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cec5691bad [test](scanner) Scanner scheduler unit test (#47783)
5cec5691bad is described below

commit 5cec5691badb1e343b6d2d26e49339fe8e56a9c9
Author: zhiqiang <hezhiqi...@selectdb.com>
AuthorDate: Tue Feb 18 23:34:04 2025 +0800

    [test](scanner) Scanner scheduler unit test (#47783)
    
    ### What problem does this PR solve?
    
    Unit test for scanner schedule. Adaptive scan schedule is introduced by
    https://github.com/apache/doris/pull/44690
    
    * ScannerContext::init
    * ScannerContext::_push_back_scan_task
    * ScannerContext::_get_margin
    * ScannerContext::_pull_next_scan_task
    * ScannerContext::_schedule_scan_task
    * Additional test for scan operator, make sure
    `adaptive_pipeline_task_serial_read_on_limit` is working correctlly.
    * ScannerContext::get_free_block
    * ScannerContext::return_free_block
    * ScannerContext::get_block_from_queue
---
 be/src/runtime/runtime_state.h                |   9 +-
 be/src/vec/core/block.h                       |   7 +-
 be/src/vec/exec/scan/scanner_context.cpp      |  89 +--
 be/src/vec/exec/scan/scanner_scheduler.h      |  22 +-
 be/src/vec/exec/scan/vscanner.cpp             |   3 +-
 be/test/scan/mock_scanner_scheduler.h         |  30 +
 be/test/scan/mock_simplified_scan_scheduler.h |  34 ++
 be/test/scan/scanner_context_test.cpp         | 847 ++++++++++++++++++++++++++
 be/test/vec/exec/scan_operator_test.cpp       | 114 ++++
 9 files changed, 1104 insertions(+), 51 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 0fb53170c02..bc7de94d8a6 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -115,6 +115,11 @@ public:
                                                            : 
_query_options.mem_limit / 20;
     }
 
+    int32_t max_column_reader_num() const {
+        return _query_options.__isset.max_column_reader_num ? 
_query_options.max_column_reader_num
+                                                            : 20000;
+    }
+
     ObjectPool* obj_pool() const { return _obj_pool.get(); }
 
     const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
@@ -215,8 +220,8 @@ public:
     // _unreported_error_idx to _errors_log.size()
     void get_unreported_errors(std::vector<std::string>* new_errors);
 
-    [[nodiscard]] bool is_cancelled() const;
-    Status cancel_reason() const;
+    [[nodiscard]] MOCK_FUNCTION bool is_cancelled() const;
+    MOCK_FUNCTION Status cancel_reason() const;
     void cancel(const Status& reason) {
         if (_exec_status.update(reason)) {
             // Create a error status, so that we could print error stack, and
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index c15d9dc681f..8f2d56f5de3 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -34,6 +34,7 @@
 #include <utility>
 #include <vector>
 
+#include "common/be_mock_util.h"
 #include "common/exception.h"
 #include "common/factory_creator.h"
 #include "common/status.h"
@@ -91,7 +92,7 @@ public:
     Block(const std::vector<SlotDescriptor>& slots, size_t block_size,
           bool ignore_trivial_slot = false);
 
-    ~Block() = default;
+    MOCK_FUNCTION ~Block() = default;
     Block(const Block& block) = default;
     Block& operator=(const Block& p) = default;
     Block(Block&& block) = default;
@@ -209,7 +210,7 @@ public:
     std::string columns_bytes() const;
 
     /// Approximate number of allocated bytes in memory - for profiling and 
limits.
-    size_t allocated_bytes() const;
+    MOCK_FUNCTION size_t allocated_bytes() const;
 
     /** Get a list of column names separated by commas. */
     std::string dump_names() const;
@@ -253,7 +254,7 @@ public:
     // Else clear column [0, column_size) delete column [column_size, 
data.size)
     void clear_column_data(int64_t column_size = -1) noexcept;
 
-    bool mem_reuse() { return !data.empty(); }
+    MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }
 
     bool is_empty_column() { return data.empty(); }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 8cd2b843f4c..74d5cdc21b3 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -71,9 +71,13 @@ ScannerContext::ScannerContext(
           _parallism_of_scan_operator(parallism_of_scan_operator),
           
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
           _min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
+    DCHECK(_state != nullptr);
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
+#ifndef BE_TEST
     _query_id = _state->get_query_ctx()->query_id();
+    _resource_ctx = _state->get_query_ctx()->resource_ctx();
+#endif
     ctx_id = UniqueId::gen_uid().to_string();
     for (auto& scanner : _all_scanners) {
         _pending_scanners.push(scanner);
@@ -81,21 +85,17 @@ ScannerContext::ScannerContext(
     if (limit < 0) {
         limit = -1;
     }
-    _resource_ctx = _state->get_query_ctx()->resource_ctx();
     _dependency = dependency;
-    if (_min_scan_concurrency_of_scan_scheduler == 0) {
-        _min_scan_concurrency_of_scan_scheduler = 2 * 
config::doris_scanner_thread_pool_thread_num;
-    }
     DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
 }
 
 // After init function call, should not access _parent
 Status ScannerContext::init() {
+#ifndef BE_TEST
     _scanner_profile = _local_state->_scanner_profile;
     _newly_create_free_blocks_num = 
_local_state->_newly_create_free_blocks_num;
     _scanner_memory_used_counter = _local_state->_memory_used_counter;
 
-#ifndef BE_TEST
     // 3. get thread token
     if (!_state->get_query_ctx()) {
         return Status::InternalError("Query context of {} is not set",
@@ -108,26 +108,13 @@ Status ScannerContext::init() {
         _should_reset_thread_name = false;
     }
 
-#endif
     _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
                                                     thread_token == nullptr ? 
"False" : "True");
 
-    // _max_bytes_in_queue controls the maximum memory that can be used by a 
single scan instance.
-    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make 
sure its actual value
-    // is larger than 10MB.
-    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), 
(int64_t)1024 * 1024 * 10);
-
-    // Provide more memory for wide tables, increase proportionally by 
multiples of 300
-    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
-
-    // TODO: Where is the proper position to place this code?
-    if (_all_scanners.empty()) {
-        _is_finished = true;
-        _set_scanner_done();
-    }
-
     auto scanner = _all_scanners.front().lock();
     DCHECK(scanner != nullptr);
+
+    // TODO: Maybe need refactor.
     // A query could have remote scan task and local scan task at the same 
time.
     // So we need to compute the _scanner_scheduler in each scan operator 
instead of query context.
     SimplifiedScanScheduler* simple_scan_scheduler = 
_state->get_query_ctx()->get_scan_scheduler();
@@ -148,6 +135,24 @@ Status ScannerContext::init() {
             _scanner_scheduler = 
_scanner_scheduler_global->get_remote_scan_thread_pool();
         }
     }
+#endif
+    // _max_bytes_in_queue controls the maximum memory that can be used by a 
single scan operator.
+    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make 
sure its actual value
+    // is larger than 10MB.
+    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), 
(int64_t)1024 * 1024 * 10);
+
+    // Provide more memory for wide tables, increase proportionally by 
multiples of 300
+    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
+
+    if (_min_scan_concurrency_of_scan_scheduler == 0) {
+        // _scanner_scheduler->get_max_threads() is setted by workload group.
+        _min_scan_concurrency_of_scan_scheduler = 2 * 
_scanner_scheduler->get_max_threads();
+    }
+
+    if (_all_scanners.empty()) {
+        _is_finished = true;
+        _set_scanner_done();
+    }
 
     // The overall target of our system is to make full utilization of the 
resources.
     // At the same time, we dont want too many tasks are queued by scheduler, 
that is not necessary.
@@ -155,7 +160,6 @@ Status ScannerContext::init() {
     // So that for a single query, we can make sure it could make full 
utilization of the resource.
     _max_scan_concurrency = _state->num_scanner_threads();
     if (_max_scan_concurrency == 0) {
-        // TODO: Add unit test.
         // Why this is safe:
         /*
             1. If num cpu cores is less than or equal to 24:
@@ -172,11 +176,6 @@ Status ScannerContext::init() {
         */
         _max_scan_concurrency =
                 _min_scan_concurrency_of_scan_scheduler / 
_parallism_of_scan_operator;
-        // In some rare cases, user may set parallel_pipeline_task_num to 1 
handly to make many query could be executed
-        // in parallel. We need to make sure the _max_thread_num is smaller 
than previous value in this situation.
-        _max_scan_concurrency =
-                std::min(_max_scan_concurrency, 
config::doris_scanner_thread_pool_thread_num);
-
         _max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : 
_max_scan_concurrency;
     }
 
@@ -185,7 +184,7 @@ Status ScannerContext::init() {
     // when user not specify scan_thread_num, so we can try downgrade 
_max_thread_num.
     // becaue we found in a table with 5k columns, column reader may ocuppy 
too much memory.
     // you can refer https://github.com/apache/doris/issues/35340 for details.
-    int32_t max_column_reader_num = 
_state->query_options().max_column_reader_num;
+    const int32_t max_column_reader_num = _state->max_column_reader_num();
 
     if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
         int32_t scan_column_num = _output_tuple_desc->slots().size();
@@ -478,6 +477,15 @@ int32_t 
ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
             _min_scan_concurrency_of_scan_scheduler -
             (_scanner_scheduler->get_active_threads() + 
_scanner_scheduler->get_queue_size());
 
+    // Remaing margin is less than _parallism_of_scan_operator of this 
ScanNode.
+    if (margin_2 > 0 && margin_2 < _parallism_of_scan_operator) {
+        // Each scan operator will at most one scanner.
+        margin_2 = 1;
+    } else {
+        // The margin is distributed evenly to each scan operator.
+        margin_2 = margin_2 / _parallism_of_scan_operator;
+    }
+
     if (margin_1 <= 0 && margin_2 <= 0) {
         return 0;
     }
@@ -500,6 +508,11 @@ int32_t 
ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
 Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> 
current_scan_task,
                                            std::unique_lock<std::mutex>& 
transfer_lock,
                                            
std::unique_lock<std::shared_mutex>& scheduler_lock) {
+    if (current_scan_task &&
+        (!current_scan_task->cached_blocks.empty() || 
current_scan_task->is_eos())) {
+        throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler 
logical error.");
+    }
+
     std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
 
     int32_t margin = _get_margin(transfer_lock, scheduler_lock);
@@ -509,12 +522,6 @@ Status 
ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
         // Be careful with current scan task.
         // We need to add it back to task queue to make sure it could be 
resubmitted.
         if (current_scan_task) {
-            DCHECK(current_scan_task->cached_blocks.empty());
-            DCHECK(!current_scan_task->is_eos());
-            if (!current_scan_task->cached_blocks.empty() || 
current_scan_task->is_eos()) {
-                throw doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                       "Scanner schduler logical error.");
-            }
             // This usually happens when we should downgrade the concurrency.
             _pending_scanners.push(current_scan_task->scanner);
             VLOG_DEBUG << fmt::format(
@@ -522,6 +529,16 @@ Status 
ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
                     "{}, _num_scheduled_scanners {}",
                     ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
         }
+
+#ifndef NDEBUG
+        // This DCHECK is necessary.
+        // We need to make sure each scan operator could have at least 1 scan 
tasks.
+        // Or this scan operator will not be re-scheduled.
+        if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && 
_tasks_queue.empty()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner 
scheduler logical error.");
+        }
+#endif
+
         return Status::OK();
     }
 
@@ -546,7 +563,7 @@ Status 
ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
                     if (!current_scan_task->cached_blocks.empty() || 
current_scan_task->is_eos()) {
                         // This should not happen.
                         throw doris::Exception(ErrorCode::INTERNAL_ERROR,
-                                               "Scanner schduler logical 
error.");
+                                               "Scanner scheduler logical 
error.");
                     }
                     // Current scan task is not eos, but we can not resubmit 
it.
                     // Add current_scan_task back to task queue, so that we 
have chance to resubmit it in the future.
@@ -595,11 +612,9 @@ std::shared_ptr<ScanTask> 
ScannerContext::_pull_next_scan_task(
     }
 
     if (current_scan_task != nullptr) {
-        DCHECK(current_scan_task->cached_blocks.empty());
-        DCHECK(!current_scan_task->is_eos());
         if (!current_scan_task->cached_blocks.empty() || 
current_scan_task->is_eos()) {
             // This should not happen.
-            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner 
schduler logical error.");
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner 
scheduler logical error.");
         }
         return current_scan_task;
     }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 9202f845345..b6d905eacd9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <memory>
 
+#include "common/be_mock_util.h"
 #include "common/status.h"
 #include "util/threadpool.h"
 
@@ -53,11 +54,12 @@ class SimplifiedScanScheduler;
 class ScannerScheduler {
 public:
     ScannerScheduler();
-    ~ScannerScheduler();
+    virtual ~ScannerScheduler();
 
     [[nodiscard]] Status init(ExecEnv* env);
 
-    Status submit(std::shared_ptr<ScannerContext> ctx, 
std::shared_ptr<ScanTask> scan_task);
+    MOCK_FUNCTION Status submit(std::shared_ptr<ScannerContext> ctx,
+                                std::shared_ptr<ScanTask> scan_task);
 
     void stop();
 
@@ -119,8 +121,10 @@ public:
               _sched_name(sched_name),
               _workload_group(workload_group) {}
 
-    ~SimplifiedScanScheduler() {
+    MOCK_FUNCTION ~SimplifiedScanScheduler() {
+#ifndef BE_TEST
         stop();
+#endif
         LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
     }
 
@@ -201,15 +205,17 @@ public:
         }
     }
 
-    int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
+    MOCK_FUNCTION int get_queue_size() { return 
_scan_thread_pool->get_queue_size(); }
 
-    int get_active_threads() { return _scan_thread_pool->num_active_threads(); 
}
+    MOCK_FUNCTION int get_active_threads() { return 
_scan_thread_pool->num_active_threads(); }
+
+    int get_max_threads() { return _scan_thread_pool->max_threads(); }
 
     std::vector<int> thread_debug_info() { return 
_scan_thread_pool->debug_info(); }
 
-    Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
-                              std::shared_ptr<ScanTask> current_scan_task,
-                              std::unique_lock<std::mutex>& transfer_lock);
+    MOCK_FUNCTION Status schedule_scan_task(std::shared_ptr<ScannerContext> 
scanner_ctx,
+                                            std::shared_ptr<ScanTask> 
current_scan_task,
+                                            std::unique_lock<std::mutex>& 
transfer_lock);
 
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 5baf2ae9dad..ff928732510 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -246,8 +246,9 @@ Status VScanner::close(RuntimeState* state) {
     if (_is_closed) {
         return Status::OK();
     }
-
+#ifndef BE_TEST
     COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, 
_scanner_wait_worker_timer);
+#endif
     _is_closed = true;
     return Status::OK();
 }
diff --git a/be/test/scan/mock_scanner_scheduler.h 
b/be/test/scan/mock_scanner_scheduler.h
new file mode 100644
index 00000000000..2033a105b81
--- /dev/null
+++ b/be/test/scan/mock_scanner_scheduler.h
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+
+#include "common/status.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+
+namespace doris::vectorized {
+class MockScannerScheduler : ScannerScheduler {
+public:
+    MockScannerScheduler() = default;
+
+    MOCK_METHOD2(submit, Status(std::shared_ptr<ScannerContext>, 
std::shared_ptr<ScanTask>));
+};
+} // namespace doris::vectorized
diff --git a/be/test/scan/mock_simplified_scan_scheduler.h 
b/be/test/scan/mock_simplified_scan_scheduler.h
new file mode 100644
index 00000000000..6a139ac7ae6
--- /dev/null
+++ b/be/test/scan/mock_simplified_scan_scheduler.h
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+
+#include "vec/exec/scan/scanner_scheduler.h"
+
+namespace doris::vectorized {
+class MockSimplifiedScanScheduler : SimplifiedScanScheduler {
+public:
+    MockSimplifiedScanScheduler(std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+            : SimplifiedScanScheduler("ForTest", cgroup_cpu_ctl) {}
+
+    MOCK_METHOD0(get_active_threads, int());
+    MOCK_METHOD0(get_queue_size, int());
+    MOCK_METHOD3(schedule_scan_task, Status(std::shared_ptr<ScannerContext> 
scanner_ctx,
+                                            std::shared_ptr<ScanTask> 
current_scan_task,
+                                            std::unique_lock<std::mutex>& 
transfer_lock));
+};
+} // namespace doris::vectorized
diff --git a/be/test/scan/scanner_context_test.cpp 
b/be/test/scan/scanner_context_test.cpp
new file mode 100644
index 00000000000..f482fd500e9
--- /dev/null
+++ b/be/test/scan/scanner_context_test.cpp
@@ -0,0 +1,847 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/scanner_context.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/Metrics_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gtest/gtest.h>
+
+#include <list>
+#include <memory>
+#include <mutex>
+#include <tuple>
+
+#include "common/object_pool.h"
+#include "mock_scanner_scheduler.h"
+#include "mock_simplified_scan_scheduler.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/olap_scan_operator.h"
+#include "runtime/descriptors.h"
+#include "vec/core/block.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+class ScannerContextTest : public testing::Test {
+public:
+    void SetUp() override {
+        obj_pool = std::make_unique<ObjectPool>();
+        // This ScanNode has two tuples.
+        // First one is input tuple, second one is output tuple.
+        tnode.row_tuples.push_back(TTupleId(0));
+        tnode.row_tuples.push_back(TTupleId(1));
+        std::vector<bool> null_map {false, false};
+        tnode.nullable_tuples = null_map;
+        tbl_desc.tableType = TTableType::OLAP_TABLE;
+
+        tuple_desc.id = 0;
+        tuple_descs.push_back(tuple_desc);
+        tuple_desc.id = 1;
+        tuple_descs.push_back(tuple_desc);
+
+        type_node.type = TTypeNodeType::SCALAR;
+
+        scalar_type.__set_type(TPrimitiveType::STRING);
+        type_node.__set_scalar_type(scalar_type);
+        slot_desc.slotType.types.push_back(type_node);
+        slot_desc.id = 0;
+        slot_desc.parent = 0;
+        slot_descs.push_back(slot_desc);
+        slot_desc.id = 1;
+        slot_desc.parent = 1;
+        slot_descs.push_back(slot_desc);
+        thrift_tbl.tableDescriptors.push_back(tbl_desc);
+        thrift_tbl.tupleDescriptors = tuple_descs;
+        thrift_tbl.slotDescriptors = slot_descs;
+        std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, 
&descs);
+        auto task_exec_ctx = std::make_shared<TaskExecutionContext>();
+        state->set_task_execution_context(task_exec_ctx);
+        output_tuple_desc = descs->get_tuple_descriptor(0);
+    }
+
+private:
+    class MockBlock : public Block {
+        MockBlock() = default;
+        MOCK_CONST_METHOD0(allocated_bytes, size_t());
+        MOCK_METHOD0(mem_reuse, bool());
+        MOCK_METHOD1(clear_column_data, void(int64_t));
+    };
+
+    class MockRuntimeState : public RuntimeState {
+        MockRuntimeState() = default;
+        MOCK_CONST_METHOD0(is_cancelled, bool());
+        MOCK_CONST_METHOD0(cancel_reason, Status());
+    };
+
+    std::unique_ptr<ObjectPool> obj_pool;
+    TPlanNode tnode;
+    TTableDescriptor tbl_desc;
+    std::vector<TTupleDescriptor> tuple_descs;
+    TTupleDescriptor tuple_desc;
+    std::vector<TSlotDescriptor> slot_descs;
+    TSlotDescriptor slot_desc;
+    TTypeNode type_node;
+    TScalarType scalar_type;
+    TDescriptorTable thrift_tbl;
+    DescriptorTbl* descs = nullptr;
+    std::unique_ptr<RuntimeState> state = std::make_unique<RuntimeState>();
+    std::unique_ptr<RuntimeProfile> profile = 
std::make_unique<RuntimeProfile>("TestProfile");
+    std::unique_ptr<RuntimeProfile::Counter> max_concurrency_counter =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
+    std::unique_ptr<RuntimeProfile::Counter> min_concurrency_counter =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
+
+    std::unique_ptr<RuntimeProfile::Counter> newly_create_free_blocks_num =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
+    std::unique_ptr<RuntimeProfile::Counter> scanner_memory_used_counter =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
+
+    TupleDescriptor* output_tuple_desc = nullptr;
+    RowDescriptor* output_row_descriptor = nullptr;
+    std::shared_ptr<pipeline::Dependency> scan_dependency =
+            pipeline::Dependency::create_shared(0, 0, "TestScanDependency");
+    std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
+    std::unique_ptr<SimplifiedScanScheduler> scan_scheduler =
+            std::make_unique<SimplifiedScanScheduler>("ForTest", 
cgroup_cpu_ctl);
+};
+
+TEST_F(ScannerContextTest, test_init) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scan_operator->_should_run_serial = false;
+
+    olap_scan_local_state->_max_scan_concurrency = 
max_concurrency_counter.get();
+    olap_scan_local_state->_min_scan_concurrency = 
min_concurrency_counter.get();
+
+    olap_scan_local_state->_parent = scan_operator.get();
+
+    // User specified num_scanner_threads is less than _max_scan_concurrency 
that we calculated
+    TQueryOptions query_options;
+    query_options.__set_num_scanner_threads(2);
+    query_options.__set_max_column_reader_num(0);
+    state->set_query_options(query_options);
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, 
testing::_))
+            .WillRepeatedly(testing::Return(Status::OK()));
+    scanner_context->_scanner_scheduler = scheduler.get();
+
+    // max_scan_concurrency that we calculate will be 10 / 1 = 10;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
+    Status st = scanner_context->init();
+    ASSERT_TRUE(st.ok());
+    // actual max_scan_concurrency will be 2 since user specified 
num_scanner_threads is 2.
+    ASSERT_EQ(scanner_context->_max_scan_concurrency, 2);
+
+    query_options.__set_num_scanner_threads(0);
+    state->set_query_options(query_options);
+
+    st = scanner_context->init();
+    ASSERT_TRUE(st.ok());
+
+    ASSERT_EQ(scanner_context->_max_scan_concurrency,
+              scanner_context->_min_scan_concurrency_of_scan_scheduler / 
parallel_tasks);
+}
+
+TEST_F(ScannerContextTest, test_serial_run) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scan_operator->_should_run_serial = true;
+
+    olap_scan_local_state->_max_scan_concurrency = 
max_concurrency_counter.get();
+    olap_scan_local_state->_min_scan_concurrency = 
min_concurrency_counter.get();
+
+    olap_scan_local_state->_parent = scan_operator.get();
+
+    TQueryOptions query_options;
+    query_options.__set_num_scanner_threads(2);
+    query_options.__set_max_column_reader_num(0);
+    state->set_query_options(query_options);
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, 
testing::_))
+            .WillRepeatedly(testing::Return(Status::OK()));
+    scanner_context->_scanner_scheduler = scheduler.get();
+
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
+    Status st = scanner_context->init();
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
+
+    query_options.__set_num_scanner_threads(0);
+    state->set_query_options(query_options);
+    st = scanner_context->init();
+    ASSERT_TRUE(st.ok());
+
+    ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
+}
+
+TEST_F(ScannerContextTest, test_max_column_reader_num) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 20; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scan_operator->_should_run_serial = false;
+
+    olap_scan_local_state->_max_scan_concurrency = 
max_concurrency_counter.get();
+    olap_scan_local_state->_min_scan_concurrency = 
min_concurrency_counter.get();
+
+    olap_scan_local_state->_parent = scan_operator.get();
+
+    TQueryOptions query_options;
+    query_options.__set_num_scanner_threads(20);
+    query_options.__set_max_column_reader_num(1);
+    state->set_query_options(query_options);
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, 
testing::_))
+            .WillRepeatedly(testing::Return(Status::OK()));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
+    Status st = scanner_context->init();
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
+}
+
+TEST_F(ScannerContextTest, test_push_back_scan_task) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scanner_context->_num_scheduled_scanners = 11;
+
+    for (int i = 0; i < 5; ++i) {
+        auto scan_task = 
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
+        scanner_context->push_back_scan_task(scan_task);
+        ASSERT_EQ(scanner_context->_num_scheduled_scanners, 10 - i);
+    }
+}
+
+TEST_F(ScannerContextTest, get_margin) {
+    const int parallel_tasks = 4;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    std::mutex transfer_mutex;
+    std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
+    std::shared_mutex scheduler_mutex;
+    std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
+    scanner_context->_scanner_scheduler = scan_scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    // _task_queue.size is 0.
+    // _num_schedule_scanners is 0.
+    std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
+
+    // Has not submit any scan tasks.
+    // ScanScheduler is empty too.
+    // So margin shuold be equal to _min_scan_concurrency_of_scan_scheduler / 
parallel_tasks.
+    // We can make full utilization of the resource.
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(0));
+    EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(0));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    int32_t margin = scanner_context->_get_margin(transfer_lock, 
scheduler_lock);
+
+    ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler 
/ parallel_tasks);
+
+    // ScanSchedule has 5 active threads and 10 tasks in queue.
+    // So remaing margin(3) is less than parallel_tasks(4).
+    scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(5));
+    EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 18;
+    margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
+    ASSERT_EQ(margin, 1);
+
+    // ScanSchedule has 10 active threads and 2 tasks in queue.
+    // Remaing margin(8) is greater than parallel_tasks(4).
+    // So margin should be equal to margin(8)/parallel_tasks(4) == 2.
+    scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, 
get_active_threads()).WillOnce(testing::Return(10));
+    EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(2));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
+    ASSERT_EQ(margin,
+              (scanner_context->_min_scan_concurrency_of_scan_scheduler - 12) 
/ parallel_tasks);
+
+    // ScanSchedule is busy.
+    // Just submit _min_scan_concurrency tasks.
+    scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, 
get_active_threads()).WillOnce(testing::Return(50));
+    EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    scanner_context->_num_scheduled_scanners = 0;
+    margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
+    ASSERT_EQ(margin, scanner_context->_min_scan_concurrency);
+
+    // ScanSchedule is busy.
+    // _min_scan_concurrency is already satisfied.
+    scheduler = std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, 
get_active_threads()).WillOnce(testing::Return(50));
+    EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    scanner_context->_num_scheduled_scanners = 20;
+    margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
+    ASSERT_EQ(margin, 0);
+}
+
+TEST_F(ScannerContextTest, pull_next_scan_task) {
+    const int parallel_tasks = 4;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    std::mutex transfer_mutex;
+    std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
+    std::shared_mutex scheduler_mutex;
+    std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
+    scanner_context->_scanner_scheduler = scan_scheduler.get();
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+
+    scanner_context->_scanner_scheduler = scan_scheduler.get();
+    scanner_context->_max_scan_concurrency = 1;
+    std::shared_ptr<ScanTask> pull_scan_task =
+            scanner_context->_pull_next_scan_task(nullptr, 
scanner_context->_max_scan_concurrency);
+    ASSERT_EQ(pull_scan_task, nullptr);
+    auto scan_task = 
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
+    pull_scan_task = scanner_context->_pull_next_scan_task(scan_task,
+                                                           
scanner_context->_max_scan_concurrency);
+    ASSERT_EQ(pull_scan_task, nullptr);
+
+    scanner_context->_max_scan_concurrency = 2;
+    BlockUPtr cached_block = Block::create_unique();
+    scan_task->cached_blocks.emplace_back(std::move(cached_block), 0);
+    EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
+            scan_task, scanner_context->_max_scan_concurrency - 1));
+    scan_task->cached_blocks.clear();
+    scan_task->eos = true;
+    EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task(
+            scan_task, scanner_context->_max_scan_concurrency - 1));
+
+    scan_task->cached_blocks.clear();
+    scan_task->eos = false;
+    pull_scan_task = scanner_context->_pull_next_scan_task(
+            scan_task, scanner_context->_max_scan_concurrency - 1);
+    EXPECT_EQ(pull_scan_task.get(), scan_task.get());
+
+    scanner_context->_pending_scanners = 
std::stack<std::weak_ptr<ScannerDelegate>>();
+    pull_scan_task = scanner_context->_pull_next_scan_task(
+            nullptr, scanner_context->_max_scan_concurrency - 1);
+    EXPECT_EQ(pull_scan_task, nullptr);
+
+    
scanner_context->_pending_scanners.push(std::make_shared<ScannerDelegate>(scanner));
+    pull_scan_task = scanner_context->_pull_next_scan_task(
+            nullptr, scanner_context->_max_scan_concurrency - 1);
+    EXPECT_NE(pull_scan_task, nullptr);
+}
+
+TEST_F(ScannerContextTest, schedule_scan_task) {
+    const int parallel_tasks = 4;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 15; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    std::mutex transfer_mutex;
+    std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
+    std::shared_mutex scheduler_mutex;
+    std::unique_lock<std::shared_mutex> scheduler_lock(scheduler_mutex);
+    std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
+
+    // Scan resource is enough.
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, 
get_active_threads()).WillRepeatedly(testing::Return(0));
+    EXPECT_CALL(*scheduler, 
get_queue_size()).WillRepeatedly(testing::Return(0));
+
+    std::unique_ptr<MockScannerScheduler> scanner_scheduler =
+            std::make_unique<MockScannerScheduler>();
+    EXPECT_CALL(*scanner_scheduler, submit(testing::_, testing::_))
+            .WillRepeatedly(testing::Return(Status::OK()));
+
+    scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_max_scan_concurrency = 1;
+    scanner_context->_max_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+
+    Status st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, 
scheduler_lock);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
+
+    scanner_context->_max_scan_concurrency = 10;
+    scanner_context->_max_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, 
scheduler_lock);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(scanner_context->_num_scheduled_scanners, 
scanner_context->_max_scan_concurrency);
+
+    scanner_context = ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
+    scanner_context->_scanner_scheduler = scheduler.get();
+
+    scanner_context->_max_scan_concurrency = 100;
+    scanner_context->_min_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    int margin = scanner_context->_get_margin(transfer_lock, scheduler_lock);
+    ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler 
/ parallel_tasks);
+    st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, 
scheduler_lock);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(scanner_context->_num_scheduled_scanners,
+              scanner_context->_min_scan_concurrency_of_scan_scheduler / 
parallel_tasks);
+
+    scanners = std::list<std::shared_ptr<ScannerDelegate>>();
+    for (int i = 0; i < 1; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    scanner_context = ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
+    scanner_context->_scanner_scheduler = scheduler.get();
+
+    scanner_context->_max_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, 
scheduler_lock);
+    auto scan_task = 
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
+    st = scanner_context->_schedule_scan_task(scan_task, transfer_lock, 
scheduler_lock);
+    // current scan task is added back.
+    ASSERT_EQ(scanner_context->_pending_scanners.size(), 1);
+    ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1);
+
+    scanner_context = ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
+    scanner_context->_scanner_scheduler = scheduler.get();
+
+    scanner_context->_max_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency = 1;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 20;
+    st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, 
scheduler_lock);
+    scan_task = 
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
+    scan_task->cached_blocks.emplace_back(Block::create_unique(), 0);
+    // Illigeal situation.
+    // If current scan task has cached block, it should not be called with 
this methods.
+    EXPECT_ANY_THROW(std::ignore = 
scanner_context->_schedule_scan_task(scan_task, transfer_lock,
+                                                                        
scheduler_lock));
+}
+
+TEST_F(ScannerContextTest, scan_queue_mem_limit) {
+    state->_query_options.__set_scan_queue_mem_limit(100);
+    ASSERT_EQ(state->scan_queue_mem_limit(), 100);
+
+    state->_query_options.__isset.scan_queue_mem_limit = false;
+    state->_query_options.__set_mem_limit(200);
+    ASSERT_EQ(state->scan_queue_mem_limit(), 200 / 20);
+
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+    olap_scan_local_state->_max_scan_concurrency = 
max_concurrency_counter.get();
+    olap_scan_local_state->_min_scan_concurrency = 
min_concurrency_counter.get();
+
+    olap_scan_local_state->_parent = scan_operator.get();
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, 
testing::_))
+            .WillRepeatedly(testing::Return(Status::OK()));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    // max_scan_concurrency that we calculate will be 10 / 1 = 10;
+    scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
+
+    std::ignore = scanner_context->init();
+    ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 
300 + 1));
+}
+
+TEST_F(ScannerContextTest, get_free_block) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+    scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
+    scanner_context->_newly_create_free_blocks_num->set(0L);
+    scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
+    scanner_context->_scanner_memory_used_counter->set(0L);
+    BlockUPtr block = scanner_context->get_free_block(/*force=*/true);
+    ASSERT_NE(block, nullptr);
+    ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1);
+
+    scanner_context->_max_bytes_in_queue = 200;
+    // no free block
+    // force is false, _block_memory_usage < _max_bytes_in_queue
+    block = scanner_context->get_free_block(/*force=*/false);
+    ASSERT_NE(block, nullptr);
+    ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2);
+
+    std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
+    EXPECT_CALL(*return_block, 
allocated_bytes()).WillRepeatedly(testing::Return(100));
+    EXPECT_CALL(*return_block, 
mem_reuse()).WillRepeatedly(testing::Return(true));
+    scanner_context->_free_blocks.enqueue(std::move(return_block));
+    // get free block from queue
+    block = scanner_context->get_free_block(/*force=*/false);
+    ASSERT_NE(block, nullptr);
+    ASSERT_EQ(scanner_context->_block_memory_usage, -100);
+    ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100);
+}
+
+TEST_F(ScannerContextTest, return_free_block) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+    scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
+    scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
+    scanner_context->_max_bytes_in_queue = 200;
+    scanner_context->_block_memory_usage = 0;
+
+    std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
+    EXPECT_CALL(*return_block, 
allocated_bytes()).WillRepeatedly(testing::Return(100));
+    EXPECT_CALL(*return_block, 
mem_reuse()).WillRepeatedly(testing::Return(true));
+    EXPECT_CALL(*return_block, 
clear_column_data(testing::_)).WillRepeatedly(testing::Return());
+
+    scanner_context->return_free_block(std::move(return_block));
+    ASSERT_EQ(scanner_context->_block_memory_usage, 100);
+    ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100);
+    // free_block queue is stabilized, so size_approx is accurate.
+    ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1);
+}
+
+TEST_F(ScannerContextTest, get_block_from_queue) {
+    const int parallel_tasks = 1;
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam 
{});
+
+    auto olap_scan_local_state =
+            pipeline::OlapScanLocalState::create_unique(state.get(), 
scan_operator.get());
+
+    const int64_t limit = 100;
+
+    NewOlapScanner::Params scanner_params;
+    scanner_params.state = state.get();
+    scanner_params.profile = profile.get();
+    scanner_params.limit = limit;
+    scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty
+
+    std::shared_ptr<VScanner> scanner =
+            NewOlapScanner::create_shared(olap_scan_local_state.get(), 
std::move(scanner_params));
+
+    std::list<std::shared_ptr<ScannerDelegate>> scanners;
+    for (int i = 0; i < 11; ++i) {
+        scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
+    }
+
+    std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
+            state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
+            scanners, limit, scan_dependency, parallel_tasks);
+    scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
+    scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
+    scanner_context->_max_bytes_in_queue = 200;
+    scanner_context->_block_memory_usage = 0;
+
+    std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
+    EXPECT_CALL(*return_block, 
allocated_bytes()).WillRepeatedly(testing::Return(100));
+    EXPECT_CALL(*return_block, 
mem_reuse()).WillRepeatedly(testing::Return(true));
+    EXPECT_CALL(*return_block, 
clear_column_data(testing::_)).WillRepeatedly(testing::Return());
+
+    std::unique_ptr<MockRuntimeState> mock_runtime_state = 
std::make_unique<MockRuntimeState>();
+    EXPECT_CALL(*mock_runtime_state, 
is_cancelled()).WillOnce(testing::Return(true));
+    EXPECT_CALL(*mock_runtime_state, cancel_reason())
+            .WillOnce(testing::Return(Status::Cancelled("TestCancelMsg")));
+    bool eos = false;
+    Status st = 
scanner_context->get_block_from_queue(mock_runtime_state.get(), 
return_block.get(),
+                                                      &eos, 0);
+    EXPECT_TRUE(!st.ok());
+    EXPECT_EQ(st.msg(), "TestCancelMsg");
+
+    EXPECT_CALL(*mock_runtime_state, 
is_cancelled()).WillRepeatedly(testing::Return(false));
+
+    scanner_context->_process_status = Status::InternalError("TestCancel");
+    st = scanner_context->get_block_from_queue(mock_runtime_state.get(), 
return_block.get(), &eos,
+                                               0);
+    EXPECT_TRUE(!st.ok());
+    EXPECT_TRUE(st.msg() == "TestCancel");
+
+    scanner_context->_process_status = Status::OK();
+    scanner_context->_is_finished = false;
+    scanner_context->_should_stop = false;
+    auto scan_task = 
std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
+    scan_task->set_eos(true);
+    scanner_context->_tasks_queue.push_back(scan_task);
+    std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
+            std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
+    EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, 
testing::_))
+            .WillOnce(testing::Return(Status::OK()));
+    scanner_context->_scanner_scheduler = scheduler.get();
+    scanner_context->_num_finished_scanners = 0;
+    EXPECT_CALL(*return_block, 
mem_reuse()).WillRepeatedly(testing::Return(false));
+    st = scanner_context->get_block_from_queue(mock_runtime_state.get(), 
return_block.get(), &eos,
+                                               0);
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(scanner_context->_num_finished_scanners, 1);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/scan_operator_test.cpp 
b/be/test/vec/exec/scan_operator_test.cpp
new file mode 100644
index 00000000000..d5f36e31b3c
--- /dev/null
+++ b/be/test/vec/exec/scan_operator_test.cpp
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/QueryCache_types.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/olap_scan_operator.h"
+#include "runtime/descriptors.h"
+
+namespace doris::vectorized {
+class ScanOperatorTest : public testing::Test {
+public:
+    void SetUp() override {
+        obj_pool = std::make_unique<ObjectPool>();
+        // This ScanNode has two tuples.
+        // First one is input tuple, second one is output tuple.
+        tbl_desc.tableType = TTableType::OLAP_TABLE;
+
+        tuple_desc.id = 0;
+        tuple_descs.push_back(tuple_desc);
+        tuple_desc.id = 1;
+        tuple_descs.push_back(tuple_desc);
+        thrift_tbl.tableDescriptors.push_back(tbl_desc);
+        thrift_tbl.tupleDescriptors = tuple_descs;
+        thrift_tbl.slotDescriptors = slot_descs;
+        scalar_type.__set_type(TPrimitiveType::STRING);
+        std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, 
&descs);
+    }
+
+private:
+    std::unique_ptr<ObjectPool> obj_pool;
+    TTupleDescriptor tuple_desc;
+    std::vector<TTupleDescriptor> tuple_descs;
+    DescriptorTbl* descs = nullptr;
+    TTableDescriptor tbl_desc;
+    TScalarType scalar_type;
+    TDescriptorTable thrift_tbl;
+    std::vector<TSlotDescriptor> slot_descs;
+    std::unique_ptr<RuntimeState> state = std::make_unique<RuntimeState>();
+};
+
+TEST_F(ScanOperatorTest, adaptive_pipeline_task_serial_read_on_limit) {
+    const int parallel_pipeline_task_num = 24;
+    TPlanNode tnode;
+    tnode.row_tuples.push_back(TTupleId(0));
+    tnode.row_tuples.push_back(TTupleId(1));
+    std::vector<bool> null_map {false, false};
+    tnode.nullable_tuples = null_map;
+
+    // Scan with conjuncts
+    TExpr conjunct;
+    std::vector<TExpr> conjuncts {conjunct};
+    tnode.__set_conjuncts(conjuncts);
+    auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, 
TQueryCacheParam {});
+
+    TQueryOptions query_options;
+    // enable_adaptive_pipeline_task_serial_read_on_limit is true
+    
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true);
+    state->set_query_options(query_options);
+
+    std::ignore = scan_operator->init(tnode, state.get());
+    // With conjuncts, should_run_serial is false
+    ASSERT_EQ(scan_operator->_should_run_serial, false);
+
+    // Scan without conjuncts
+    conjuncts.clear();
+    tnode.__set_conjuncts(conjuncts);
+    // limit 10
+    tnode.__set_limit(10);
+    scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
+            obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, 
TQueryCacheParam {});
+
+    // enable_adaptive_pipeline_task_serial_read_on_limit is true
+    
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true);
+    query_options.__set_adaptive_pipeline_task_serial_read_on_limit(10);
+    state->set_query_options(query_options);
+
+    std::ignore = scan_operator->init(tnode, state.get());
+    // Without conjuncts, limit 10 <= 
adaptive_pipeline_task_serial_read_on_limit 10
+    ASSERT_EQ(scan_operator->_should_run_serial, true);
+
+    query_options.__set_adaptive_pipeline_task_serial_read_on_limit(9);
+    state->set_query_options(query_options);
+    std::ignore = scan_operator->init(tnode, state.get());
+    // Without conjuncts, limit 10 > 
adaptive_pipeline_task_serial_read_on_limit 9
+    ASSERT_EQ(scan_operator->_should_run_serial, true);
+
+    
query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(false);
+    query_options.__set_adaptive_pipeline_task_serial_read_on_limit(900);
+    state->set_query_options(query_options);
+    scan_operator->_should_run_serial = false;
+    std::ignore = scan_operator->init(tnode, state.get());
+    // Without conjuncts, enable_adaptive_pipeline_task_serial_read_on_limit 
is false
+    ASSERT_EQ(scan_operator->_should_run_serial, false);
+}
+} // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to