This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 760d0580b8c [refactor](scheduler) Split simple scheduler and blocking 
scheduler (#55411)
760d0580b8c is described below

commit 760d0580b8cd8658fe5fb1dc7c03c857c821633e
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 29 16:42:11 2025 +0800

    [refactor](scheduler) Split simple scheduler and blocking scheduler (#55411)
---
 be/src/pipeline/exec/operator.cpp                |  6 ++++
 be/src/pipeline/exec/operator.h                  | 14 ++++++++
 be/src/pipeline/pipeline_fragment_context.cpp    |  2 +-
 be/src/pipeline/pipeline_task.cpp                | 20 ++++++-----
 be/src/pipeline/pipeline_task.h                  | 13 ++++---
 be/src/pipeline/task_scheduler.cpp               | 26 ++++++++++++--
 be/src/pipeline/task_scheduler.h                 | 46 +++++++++++++++++++-----
 be/src/runtime/workload_group/workload_group.cpp | 13 ++++---
 be/src/vec/exprs/vexpr.h                         |  2 ++
 be/src/vec/exprs/vexpr_context.cpp               |  4 +++
 be/src/vec/exprs/vexpr_context.h                 |  1 +
 be/test/pipeline/dummy_task_queue.h              | 19 ++++++++++
 be/test/pipeline/pipeline_task_test.cpp          | 18 ++++------
 be/test/pipeline/pipeline_test.cpp               | 31 ++++++++++------
 14 files changed, 161 insertions(+), 54 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 7aab3e99c74..4ccf8351b8b 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -286,6 +286,12 @@ Status PipelineXLocalStateBase::filter_block(const 
vectorized::VExprContextSPtrs
     return Status::OK();
 }
 
+bool PipelineXLocalStateBase::is_blockable() const {
+    return std::any_of(
+            _projections.begin(), _projections.end(),
+            [&](vectorized::VExprContextSPtr expr) -> bool { return 
expr->is_blockable(); });
+}
+
 Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* 
origin_block,
                                      vectorized::Block* output_block) const {
     auto* local_state = state->get_local_state(operator_id());
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 87e1feb4743..3f2124825fc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -134,6 +134,12 @@ public:
         return Status::OK();
     }
 
+    /**
+     * Pipeline task is blockable means it will be blocked in the next run. So 
we should put the
+     * pipeline task into the blocking task scheduler.
+     */
+    virtual bool is_blockable(RuntimeState* state) const = 0;
+
     virtual void set_low_memory_mode(RuntimeState* state) {}
 
     [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
@@ -209,6 +215,7 @@ public:
     void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) 
const = 0;
+    [[nodiscard]] virtual bool is_blockable() const;
 
     virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
@@ -483,6 +490,7 @@ public:
     virtual Status terminate(RuntimeState* state) = 0;
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
     [[nodiscard]] virtual bool is_finished() const { return false; }
+    [[nodiscard]] virtual bool is_blockable() const { return false; }
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const = 0;
 
@@ -632,6 +640,9 @@ public:
     [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, 
bool eos) {
         return state->minimum_operator_memory_required_bytes();
     }
+    bool is_blockable(RuntimeState* state) const override {
+        return state->get_sink_local_state()->is_blockable();
+    }
 
     [[nodiscard]] bool is_spillable() const { return _spillable; }
 
@@ -873,6 +884,9 @@ public:
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const 
{ return true; }
+    bool is_blockable(RuntimeState* state) const override {
+        return state->get_sink_local_state()->is_blockable();
+    }
 
     Status prepare(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b929699f294..800c021c71b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1736,7 +1736,7 @@ Status PipelineFragmentContext::submit() {
     auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
     for (auto& task : _tasks) {
         for (auto& t : task) {
-            st = scheduler->schedule_task(t);
+            st = scheduler->submit(t);
             DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
                             { st = 
Status::Aborted("PipelineFragmentContext.submit.failed"); });
             if (!st) {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 66727c66f06..da3ea738c06 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -287,6 +287,16 @@ bool PipelineTask::_is_pending_finish() {
                    [&](Dependency* dep) -> bool { return 
dep->is_blocked_by(shared_from_this()); });
 }
 
+bool PipelineTask::is_blockable() const {
+    // Before task starting, we should make sure
+    // 1. Execution dependency is ready (which is controlled by FE 2-phase 
commit)
+    // 2. Runtime filter dependencies are ready
+    // 3. All tablets are loaded into local storage
+    return std::any_of(_operators.begin(), _operators.end(),
+                       [&](OperatorPtr op) -> bool { return 
op->is_blockable(_state); }) ||
+           _sink->is_blockable(_state);
+}
+
 bool PipelineTask::is_revoking() const {
     // Spilling may be in progress if eos is true.
     return std::any_of(_spill_dependencies.begin(), _spill_dependencies.end(),
@@ -376,9 +386,6 @@ Status PipelineTask::execute(bool* done) {
     cpu_time_stop_watch.start();
     SCOPED_ATTACH_TASK(_state);
     Defer running_defer {[&]() {
-        if (_task_queue) {
-            _task_queue->update_statistics(this, time_spent);
-        }
         int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
         _task_cpu_timer->update(delta_cpu_time);
         
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
@@ -588,7 +595,7 @@ Status PipelineTask::execute(bool* done) {
         }
     }
 
-    RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
+    
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
     return Status::OK();
 }
 
@@ -694,9 +701,6 @@ Status PipelineTask::close(Status exec_status, bool 
close_sink) {
         _fresh_profile_counter();
     }
 
-    if (_task_queue) {
-        _task_queue->update_statistics(this, close_ns);
-    }
     if (close_sink) {
         RETURN_IF_ERROR(_state_transition(State::FINISHED));
     }
@@ -813,7 +817,7 @@ Status PipelineTask::wake_up(Dependency* dep) {
     _blocked_dep = nullptr;
     auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this());
     RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE));
-    RETURN_IF_ERROR(get_task_queue()->push_back(holder));
+    
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 052b4a8017a..61038d6e009 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -87,6 +87,12 @@ public:
                        : nullptr;
     }
 
+    /**
+     * Pipeline task is blockable means it will be blocked in the next run. So 
we should put it into
+     * the blocking task scheduler.
+     */
+    bool is_blockable() const;
+
     /**
      * `shared_state` is shared by different pipeline tasks. This function 
aims to establish
      * connections across related tasks.
@@ -123,12 +129,6 @@ public:
     // Execution phase should be terminated. This is called if this task is 
canceled or waken up early.
     void terminate();
 
-    PipelineTask& set_task_queue(MultiCoreTaskQueue* task_queue) {
-        _task_queue = task_queue;
-        return *this;
-    }
-    MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
-
     // 1 used for update priority queue
     // note(wb) an ugly implementation, need refactor later
     // 1.1 pipeline task
@@ -198,7 +198,6 @@ private:
     std::unique_ptr<vectorized::Block> _block;
 
     std::weak_ptr<PipelineFragmentContext> _fragment_context;
-    MultiCoreTaskQueue* _task_queue = nullptr;
 
     // used for priority queue
     // it may be visited by different thread but there is no race condition
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0bcc3a9a297..6b0e7ec8b05 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -70,7 +70,7 @@ Status TaskScheduler::start() {
     return Status::OK();
 }
 
-Status TaskScheduler::schedule_task(PipelineTaskSPtr task) {
+Status TaskScheduler::submit(PipelineTaskSPtr task) {
     return _task_queue.push_back(task);
 }
 
@@ -117,9 +117,11 @@ void TaskScheduler::_do_work(int index) {
             // Fragment already finished
             continue;
         }
-        
task->set_running(true).set_task_queue(&_task_queue).set_core_id(index);
+        task->set_running(true).set_core_id(index);
         bool done = false;
         auto status = Status::OK();
+        int64_t exec_ns = 0;
+        SCOPED_RAW_TIMER(&exec_ns);
         Defer task_running_defer {[&]() {
             // If fragment is finished, fragment context will be 
de-constructed with all tasks in it.
             if (done || !status.ok()) {
@@ -130,6 +132,7 @@ void TaskScheduler::_do_work(int index) {
             } else {
                 task->set_running(false);
             }
+            _task_queue.update_statistics(task.get(), exec_ns);
         }};
         bool canceled = fragment_context->is_canceled();
 
@@ -179,4 +182,23 @@ void TaskScheduler::stop() {
     }
 }
 
+Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
+    if (task->is_blockable()) {
+        return _blocking_scheduler.submit(task);
+    } else {
+        return _simple_scheduler.submit(task);
+    }
+}
+
+Status HybridTaskScheduler::start() {
+    RETURN_IF_ERROR(_blocking_scheduler.start());
+    RETURN_IF_ERROR(_simple_scheduler.start());
+    return Status::OK();
+}
+
+void HybridTaskScheduler::stop() {
+    _blocking_scheduler.stop();
+    _simple_scheduler.stop();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 1947c5b35e2..85e39910500 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -42,30 +42,58 @@ class ThreadPool;
 
 namespace doris::pipeline {
 
+class HybridTaskScheduler;
 class TaskScheduler {
 public:
-    TaskScheduler(int core_num, std::string name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
-            : _task_queue(core_num), _name(std::move(name)), 
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
-
-    ~TaskScheduler();
+    virtual ~TaskScheduler();
 
-    Status schedule_task(PipelineTaskSPtr task);
+    virtual Status submit(PipelineTaskSPtr task);
 
-    Status start();
+    virtual Status start();
 
-    void stop();
+    virtual void stop();
 
-    std::vector<int> thread_debug_info() { return 
_fix_thread_pool->debug_info(); }
+    virtual std::vector<std::pair<std::string, std::vector<int>>> 
thread_debug_info() {
+        return {{_name, _fix_thread_pool->debug_info()}};
+    }
 
 private:
+    friend class HybridTaskScheduler;
+
+    TaskScheduler(int core_num, std::string name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+            : _name(std::move(name)), _task_queue(core_num), 
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
+    TaskScheduler() : _task_queue(0) {}
+    std::string _name;
     std::unique_ptr<ThreadPool> _fix_thread_pool;
 
     MultiCoreTaskQueue _task_queue;
     bool _need_to_stop = false;
     bool _shutdown = false;
-    std::string _name;
     std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     void _do_work(int index);
 };
+
+class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
+public:
+    HybridTaskScheduler(int core_num, std::string name,
+                        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+            : _blocking_scheduler(core_num * 2, name + "_blocking_scheduler", 
cgroup_cpu_ctl),
+              _simple_scheduler(core_num, name + "_simple_scheduler", 
cgroup_cpu_ctl) {}
+
+    Status submit(PipelineTaskSPtr task) override;
+
+    Status start() override;
+
+    void stop() override;
+
+    std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() 
override {
+        return {_blocking_scheduler.thread_debug_info()[0],
+                _simple_scheduler.thread_debug_info()[0]};
+    }
+
+private:
+    TaskScheduler _blocking_scheduler;
+    TaskScheduler _simple_scheduler;
+};
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 5cf23ca4c41..b7bf59a80a3 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -531,8 +531,8 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
     // 1 create thread pool
     if (_task_sched == nullptr) {
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-                
std::make_unique<pipeline::TaskScheduler>(pipeline_exec_thread_num, "p_" + 
wg_name,
-                                                          cg_cpu_ctl_ptr);
+                
std::make_unique<pipeline::HybridTaskScheduler>(pipeline_exec_thread_num,
+                                                                "p_" + 
wg_name, cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);
@@ -650,9 +650,12 @@ void 
WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc
 std::string WorkloadGroup::thread_debug_info() {
     std::string str = "";
     if (_task_sched != nullptr) {
-        std::vector<int> exec_t_info = _task_sched->thread_debug_info();
-        str = fmt::format("[exec num:{}, real_num:{}, min_num:{}, 
max_num:{}],", exec_t_info[0],
-                          exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+        auto exec_t_info = _task_sched->thread_debug_info();
+        for (const auto& info : exec_t_info) {
+            str = fmt::format(
+                    "scheduler: {}, info: [exec num:{}, real_num:{}, 
min_num:{}, max_num:{}],",
+                    info.first, info.second[0], info.second[1], 
info.second[2], info.second[3]);
+        }
     }
 
     if (_scan_task_sched != nullptr) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 6b6d8881685..151cd270f13 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -130,6 +130,8 @@ public:
     }
 
     virtual Status execute(VExprContext* context, Block* block, int* 
result_column_id) = 0;
+    // `is_blockable` means this expr will be blocked in `execute` (e.g. AI 
Function, Remote Function)
+    [[nodiscard]] virtual bool is_blockable() const { return false; }
 
     // execute current expr with inverted index to filter block. Given a 
roaring bitmap of match rows
     virtual Status evaluate_inverted_index(VExprContext* context, uint32_t 
segment_num_rows) {
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index 597d1a92888..4d33aec4ea5 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -71,6 +71,10 @@ Status VExprContext::execute(vectorized::Block* block, int* 
result_column_id) {
     return st;
 }
 
+bool VExprContext::is_blockable() const {
+    return _root->is_blockable();
+}
+
 Status VExprContext::prepare(RuntimeState* state, const RowDescriptor& 
row_desc) {
     _prepared = true;
     Status st;
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index bfe75383b7e..144206fc430 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -172,6 +172,7 @@ public:
     [[nodiscard]] Status open(RuntimeState* state);
     [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
     [[nodiscard]] Status execute(Block* block, int* result_column_id);
+    [[nodiscard]] bool is_blockable() const;
 
     VExprSPtr root() { return _root; }
     void set_root(const VExprSPtr& expr) { _root = expr; }
diff --git a/be/test/pipeline/dummy_task_queue.h 
b/be/test/pipeline/dummy_task_queue.h
index 9b609f41fc8..500d29c24b6 100644
--- a/be/test/pipeline/dummy_task_queue.h
+++ b/be/test/pipeline/dummy_task_queue.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "pipeline/task_queue.h"
+#include "pipeline/task_scheduler.h"
 
 namespace doris::pipeline {
 
@@ -47,4 +48,22 @@ class DummyTaskQueue final : public MultiCoreTaskQueue {
         return task;
     }
 };
+
+class MockTaskScheduler : public TaskScheduler {
+public:
+    MockTaskScheduler() : TaskScheduler() { _task_queue = 
std::make_unique<DummyTaskQueue>(1); }
+
+    Status submit(PipelineTaskSPtr task) override { return 
_task_queue->push_back(task); }
+
+    Status start() override { return Status::OK(); }
+
+    void stop() override {}
+
+    std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info() 
override {
+        return {};
+    }
+
+private:
+    std::unique_ptr<DummyTaskQueue> _task_queue;
+};
 } // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_task_test.cpp 
b/be/test/pipeline/pipeline_task_test.cpp
index f5a08fd56e6..e885a377725 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -53,7 +53,8 @@ public:
         _query_ctx =
                 QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
                                      true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
-        _task_queue = std::make_unique<DummyTaskQueue>(1);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
         _build_fragment_context();
     }
     void TearDown() override {
@@ -83,7 +84,7 @@ private:
     TUniqueId _query_id = TUniqueId();
     std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
     TQueryOptions _query_options;
-    std::unique_ptr<DummyTaskQueue> _task_queue;
+    std::unique_ptr<MockTaskScheduler> _task_scheduler;
     const std::string LOCALHOST = BackendOptions::get_localhost();
     const int DUMMY_PORT = config::brpc_port;
 };
@@ -306,7 +307,6 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     task->_exec_time_slice = 10'000'000'000ULL;
-    task->set_task_queue(_task_queue.get());
     {
         // `execute` should be called after `prepare`
         bool done = false;
@@ -439,7 +439,6 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     task->_exec_time_slice = 10'000'000'000ULL;
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
@@ -504,7 +503,6 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     task->_exec_time_slice = 10'000'000'000ULL;
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
@@ -549,7 +547,6 @@ TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     task->_exec_time_slice = 10'000'000'000ULL;
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
@@ -630,7 +627,6 @@ TEST_F(PipelineTaskTest, TEST_SINK_EOF) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     task->_exec_time_slice = 10'000'000'000ULL;
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
@@ -669,7 +665,8 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
         _query_ctx =
                 QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
                                      true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
-        _task_queue = std::make_unique<DummyTaskQueue>(1);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
         _build_fragment_context();
 
         TWorkloadGroupInfo twg_info;
@@ -711,7 +708,6 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
     _runtime_state->resize_op_id_to_local_state(-1);
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
@@ -802,7 +798,8 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
         _query_ctx =
                 QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
                                      true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
-        _task_queue = std::make_unique<DummyTaskQueue>(1);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
         _build_fragment_context();
 
         TWorkloadGroupInfo twg_info;
@@ -847,7 +844,6 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
     _runtime_state->resize_op_id_to_local_state(-1);
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
-    task->set_task_queue(_task_queue.get());
     {
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index 391d99231f3..b59d9f3b7d6 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -66,7 +66,8 @@ public:
         _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
                 TRuntimeFilterParamsBuilder().build());
         ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
-        _task_queue = std::make_unique<DummyTaskQueue>(1);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
     }
     void TearDown() override {}
 
@@ -110,6 +111,8 @@ private:
         _query_ctx =
                 QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
                                      true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
         _runtime_state.clear();
         _context.clear();
         _fragment_id = 0;
@@ -135,7 +138,7 @@ private:
     std::shared_ptr<QueryContext> _query_ctx;
     TUniqueId _query_id = TUniqueId();
     TQueryOptions _query_options;
-    std::unique_ptr<DummyTaskQueue> _task_queue;
+    std::unique_ptr<MockTaskScheduler> _task_scheduler;
 
     // Fragment level
     // Fragment0 -> Fragment1
@@ -279,7 +282,6 @@ TEST_F(PipelineTest, HAPPY_PATH) {
                 cur_pipe, task_id, local_runtime_state.get(), _context.back(),
                 _pipeline_profiles[cur_pipe->id()].get(), shared_state_map, 
task_id);
         cur_pipe->incr_created_tasks(task_id, task.get());
-        task->set_task_queue(_task_queue.get());
         _pipeline_tasks[cur_pipe->id()].push_back(std::move(task));
         
_runtime_states[cur_pipe->id()].push_back(std::move(local_runtime_state));
     }
@@ -329,7 +331,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), true);
     _query_ctx->get_execution_dependency()->set_ready();
     // Task is ready and be push into runnable task queue.
-    EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+    
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 != nullptr,
+              true);
     EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), false);
 
     bool eos = false;
@@ -351,7 +354,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     EXPECT_EQ(block.columns(), 0);
 
     // Task is ready since a new block reached.
-    EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+    
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 != nullptr,
+              true);
     EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
                           [](const auto& dep) { return dep->ready(); }),
               true);
@@ -379,7 +383,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     EXPECT_EQ(block.columns(), 0);
 
     // Task is ready since a new block reached.
-    EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+    
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 != nullptr,
+              true);
     EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
                           [](const auto& dep) { return dep->ready(); }),
               true);
@@ -448,7 +453,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
 
     // Upstream task finished.
     local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
-    EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+    
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 != nullptr,
+              true);
     {
         // Blocked by exchange read dependency due to no data reached.
         EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos), 
Status::OK());
@@ -937,7 +943,6 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
                         _pipelines[i], task_id, local_runtime_state.get(), 
_context.back(),
                         _pipeline_profiles[_pipelines[i]->id()].get(), 
shared_state_map, j);
                 _pipelines[i]->incr_created_tasks(j, task.get());
-                task->set_task_queue(_task_queue.get());
                 
_pipeline_tasks[_pipelines[i]->id()].push_back(std::move(task));
                 
_runtime_states[_pipelines[i]->id()].push_back(std::move(local_runtime_state));
             }
@@ -993,7 +998,9 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
         for (size_t i = 0; i < _pipelines.size(); i++) {
             for (int j = 0; j < parallelism; j++) {
                 // Task is ready and be push into runnable task queue.
-                EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+                
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 !=
+                                  nullptr,
+                          true);
             }
         }
         for (size_t i = 0; i < _pipelines.size(); i++) {
@@ -1056,9 +1063,11 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
         // Pipeline 0 is blocked by hash join dependency and is still waiting 
for upstream tasks done.
         for (int j = 0; j < parallelism; j++) {
             // Task is ready and be push into runnable task queue.
-            EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+            
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
 !=
+                              nullptr,
+                      true);
         }
-        EXPECT_EQ(_task_queue->take(0), nullptr);
+        
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0),
 nullptr);
         for (int j = 0; j < parallelism; j++) {
             EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to