This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 760d0580b8c [refactor](scheduler) Split simple scheduler and blocking
scheduler (#55411)
760d0580b8c is described below
commit 760d0580b8cd8658fe5fb1dc7c03c857c821633e
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 29 16:42:11 2025 +0800
[refactor](scheduler) Split simple scheduler and blocking scheduler (#55411)
---
be/src/pipeline/exec/operator.cpp | 6 ++++
be/src/pipeline/exec/operator.h | 14 ++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 20 ++++++-----
be/src/pipeline/pipeline_task.h | 13 ++++---
be/src/pipeline/task_scheduler.cpp | 26 ++++++++++++--
be/src/pipeline/task_scheduler.h | 46 +++++++++++++++++++-----
be/src/runtime/workload_group/workload_group.cpp | 13 ++++---
be/src/vec/exprs/vexpr.h | 2 ++
be/src/vec/exprs/vexpr_context.cpp | 4 +++
be/src/vec/exprs/vexpr_context.h | 1 +
be/test/pipeline/dummy_task_queue.h | 19 ++++++++++
be/test/pipeline/pipeline_task_test.cpp | 18 ++++------
be/test/pipeline/pipeline_test.cpp | 31 ++++++++++------
14 files changed, 161 insertions(+), 54 deletions(-)
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 7aab3e99c74..4ccf8351b8b 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -286,6 +286,12 @@ Status PipelineXLocalStateBase::filter_block(const
vectorized::VExprContextSPtrs
return Status::OK();
}
+bool PipelineXLocalStateBase::is_blockable() const {
+ return std::any_of(
+ _projections.begin(), _projections.end(),
+ [&](vectorized::VExprContextSPtr expr) -> bool { return
expr->is_blockable(); });
+}
+
Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block*
origin_block,
vectorized::Block* output_block) const {
auto* local_state = state->get_local_state(operator_id());
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 87e1feb4743..3f2124825fc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -134,6 +134,12 @@ public:
return Status::OK();
}
+ /**
+ * Pipeline task is blockable means it will be blocked in the next run. So
we should put the
+ * pipeline task into the blocking task scheduler.
+ */
+ virtual bool is_blockable(RuntimeState* state) const = 0;
+
virtual void set_low_memory_mode(RuntimeState* state) {}
[[nodiscard]] virtual bool require_data_distribution() const { return
false; }
@@ -209,6 +215,7 @@ public:
void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
[[nodiscard]] virtual std::string debug_string(int indentation_level = 0)
const = 0;
+ [[nodiscard]] virtual bool is_blockable() const;
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
@@ -483,6 +490,7 @@ public:
virtual Status terminate(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
[[nodiscard]] virtual bool is_finished() const { return false; }
+ [[nodiscard]] virtual bool is_blockable() const { return false; }
[[nodiscard]] virtual std::string debug_string(int indentation_level)
const = 0;
@@ -632,6 +640,9 @@ public:
[[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state,
bool eos) {
return state->minimum_operator_memory_required_bytes();
}
+ bool is_blockable(RuntimeState* state) const override {
+ return state->get_sink_local_state()->is_blockable();
+ }
[[nodiscard]] bool is_spillable() const { return _spillable; }
@@ -873,6 +884,9 @@ public:
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const
{ return true; }
+ bool is_blockable(RuntimeState* state) const override {
+ return state->get_sink_local_state()->is_blockable();
+ }
Status prepare(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b929699f294..800c021c71b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1736,7 +1736,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
- st = scheduler->schedule_task(t);
+ st = scheduler->submit(t);
DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
{ st =
Status::Aborted("PipelineFragmentContext.submit.failed"); });
if (!st) {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 66727c66f06..da3ea738c06 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -287,6 +287,16 @@ bool PipelineTask::_is_pending_finish() {
[&](Dependency* dep) -> bool { return
dep->is_blocked_by(shared_from_this()); });
}
+bool PipelineTask::is_blockable() const {
+ // Before task starting, we should make sure
+ // 1. Execution dependency is ready (which is controlled by FE 2-phase
commit)
+ // 2. Runtime filter dependencies are ready
+ // 3. All tablets are loaded into local storage
+ return std::any_of(_operators.begin(), _operators.end(),
+ [&](OperatorPtr op) -> bool { return
op->is_blockable(_state); }) ||
+ _sink->is_blockable(_state);
+}
+
bool PipelineTask::is_revoking() const {
// Spilling may be in progress if eos is true.
return std::any_of(_spill_dependencies.begin(), _spill_dependencies.end(),
@@ -376,9 +386,6 @@ Status PipelineTask::execute(bool* done) {
cpu_time_stop_watch.start();
SCOPED_ATTACH_TASK(_state);
Defer running_defer {[&]() {
- if (_task_queue) {
- _task_queue->update_statistics(this, time_spent);
- }
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
_task_cpu_timer->update(delta_cpu_time);
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
@@ -588,7 +595,7 @@ Status PipelineTask::execute(bool* done) {
}
}
- RETURN_IF_ERROR(get_task_queue()->push_back(shared_from_this()));
+
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
return Status::OK();
}
@@ -694,9 +701,6 @@ Status PipelineTask::close(Status exec_status, bool
close_sink) {
_fresh_profile_counter();
}
- if (_task_queue) {
- _task_queue->update_statistics(this, close_ns);
- }
if (close_sink) {
RETURN_IF_ERROR(_state_transition(State::FINISHED));
}
@@ -813,7 +817,7 @@ Status PipelineTask::wake_up(Dependency* dep) {
_blocked_dep = nullptr;
auto holder = std::dynamic_pointer_cast<PipelineTask>(shared_from_this());
RETURN_IF_ERROR(_state_transition(PipelineTask::State::RUNNABLE));
- RETURN_IF_ERROR(get_task_queue()->push_back(holder));
+
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(holder));
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 052b4a8017a..61038d6e009 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -87,6 +87,12 @@ public:
: nullptr;
}
+ /**
+ * Pipeline task is blockable means it will be blocked in the next run. So
we should put it into
+ * the blocking task scheduler.
+ */
+ bool is_blockable() const;
+
/**
* `shared_state` is shared by different pipeline tasks. This function
aims to establish
* connections across related tasks.
@@ -123,12 +129,6 @@ public:
// Execution phase should be terminated. This is called if this task is
canceled or waken up early.
void terminate();
- PipelineTask& set_task_queue(MultiCoreTaskQueue* task_queue) {
- _task_queue = task_queue;
- return *this;
- }
- MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
-
// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
// 1.1 pipeline task
@@ -198,7 +198,6 @@ private:
std::unique_ptr<vectorized::Block> _block;
std::weak_ptr<PipelineFragmentContext> _fragment_context;
- MultiCoreTaskQueue* _task_queue = nullptr;
// used for priority queue
// it may be visited by different thread but there is no race condition
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 0bcc3a9a297..6b0e7ec8b05 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -70,7 +70,7 @@ Status TaskScheduler::start() {
return Status::OK();
}
-Status TaskScheduler::schedule_task(PipelineTaskSPtr task) {
+Status TaskScheduler::submit(PipelineTaskSPtr task) {
return _task_queue.push_back(task);
}
@@ -117,9 +117,11 @@ void TaskScheduler::_do_work(int index) {
// Fragment already finished
continue;
}
-
task->set_running(true).set_task_queue(&_task_queue).set_core_id(index);
+ task->set_running(true).set_core_id(index);
bool done = false;
auto status = Status::OK();
+ int64_t exec_ns = 0;
+ SCOPED_RAW_TIMER(&exec_ns);
Defer task_running_defer {[&]() {
// If fragment is finished, fragment context will be
de-constructed with all tasks in it.
if (done || !status.ok()) {
@@ -130,6 +132,7 @@ void TaskScheduler::_do_work(int index) {
} else {
task->set_running(false);
}
+ _task_queue.update_statistics(task.get(), exec_ns);
}};
bool canceled = fragment_context->is_canceled();
@@ -179,4 +182,23 @@ void TaskScheduler::stop() {
}
}
+Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
+ if (task->is_blockable()) {
+ return _blocking_scheduler.submit(task);
+ } else {
+ return _simple_scheduler.submit(task);
+ }
+}
+
+Status HybridTaskScheduler::start() {
+ RETURN_IF_ERROR(_blocking_scheduler.start());
+ RETURN_IF_ERROR(_simple_scheduler.start());
+ return Status::OK();
+}
+
+void HybridTaskScheduler::stop() {
+ _blocking_scheduler.stop();
+ _simple_scheduler.stop();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 1947c5b35e2..85e39910500 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -42,30 +42,58 @@ class ThreadPool;
namespace doris::pipeline {
+class HybridTaskScheduler;
class TaskScheduler {
public:
- TaskScheduler(int core_num, std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
- : _task_queue(core_num), _name(std::move(name)),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
-
- ~TaskScheduler();
+ virtual ~TaskScheduler();
- Status schedule_task(PipelineTaskSPtr task);
+ virtual Status submit(PipelineTaskSPtr task);
- Status start();
+ virtual Status start();
- void stop();
+ virtual void stop();
- std::vector<int> thread_debug_info() { return
_fix_thread_pool->debug_info(); }
+ virtual std::vector<std::pair<std::string, std::vector<int>>>
thread_debug_info() {
+ return {{_name, _fix_thread_pool->debug_info()}};
+ }
private:
+ friend class HybridTaskScheduler;
+
+ TaskScheduler(int core_num, std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+ : _name(std::move(name)), _task_queue(core_num),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
+ TaskScheduler() : _task_queue(0) {}
+ std::string _name;
std::unique_ptr<ThreadPool> _fix_thread_pool;
MultiCoreTaskQueue _task_queue;
bool _need_to_stop = false;
bool _shutdown = false;
- std::string _name;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
void _do_work(int index);
};
+
+class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
+public:
+ HybridTaskScheduler(int core_num, std::string name,
+ std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+ : _blocking_scheduler(core_num * 2, name + "_blocking_scheduler",
cgroup_cpu_ctl),
+ _simple_scheduler(core_num, name + "_simple_scheduler",
cgroup_cpu_ctl) {}
+
+ Status submit(PipelineTaskSPtr task) override;
+
+ Status start() override;
+
+ void stop() override;
+
+ std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info()
override {
+ return {_blocking_scheduler.thread_debug_info()[0],
+ _simple_scheduler.thread_debug_info()[0]};
+ }
+
+private:
+ TaskScheduler _blocking_scheduler;
+ TaskScheduler _simple_scheduler;
+};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 5cf23ca4c41..b7bf59a80a3 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -531,8 +531,8 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
// 1 create thread pool
if (_task_sched == nullptr) {
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-
std::make_unique<pipeline::TaskScheduler>(pipeline_exec_thread_num, "p_" +
wg_name,
- cg_cpu_ctl_ptr);
+
std::make_unique<pipeline::HybridTaskScheduler>(pipeline_exec_thread_num,
+ "p_" +
wg_name, cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
_task_sched = std::move(pipeline_task_scheduler);
@@ -650,9 +650,12 @@ void
WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc
std::string WorkloadGroup::thread_debug_info() {
std::string str = "";
if (_task_sched != nullptr) {
- std::vector<int> exec_t_info = _task_sched->thread_debug_info();
- str = fmt::format("[exec num:{}, real_num:{}, min_num:{},
max_num:{}],", exec_t_info[0],
- exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+ auto exec_t_info = _task_sched->thread_debug_info();
+ for (const auto& info : exec_t_info) {
+ str = fmt::format(
+ "scheduler: {}, info: [exec num:{}, real_num:{},
min_num:{}, max_num:{}],",
+ info.first, info.second[0], info.second[1],
info.second[2], info.second[3]);
+ }
}
if (_scan_task_sched != nullptr) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 6b6d8881685..151cd270f13 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -130,6 +130,8 @@ public:
}
virtual Status execute(VExprContext* context, Block* block, int*
result_column_id) = 0;
+ // `is_blockable` means this expr will be blocked in `execute` (e.g. AI
Function, Remote Function)
+ [[nodiscard]] virtual bool is_blockable() const { return false; }
// execute current expr with inverted index to filter block. Given a
roaring bitmap of match rows
virtual Status evaluate_inverted_index(VExprContext* context, uint32_t
segment_num_rows) {
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index 597d1a92888..4d33aec4ea5 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -71,6 +71,10 @@ Status VExprContext::execute(vectorized::Block* block, int*
result_column_id) {
return st;
}
+bool VExprContext::is_blockable() const {
+ return _root->is_blockable();
+}
+
Status VExprContext::prepare(RuntimeState* state, const RowDescriptor&
row_desc) {
_prepared = true;
Status st;
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index bfe75383b7e..144206fc430 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -172,6 +172,7 @@ public:
[[nodiscard]] Status open(RuntimeState* state);
[[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
[[nodiscard]] Status execute(Block* block, int* result_column_id);
+ [[nodiscard]] bool is_blockable() const;
VExprSPtr root() { return _root; }
void set_root(const VExprSPtr& expr) { _root = expr; }
diff --git a/be/test/pipeline/dummy_task_queue.h
b/be/test/pipeline/dummy_task_queue.h
index 9b609f41fc8..500d29c24b6 100644
--- a/be/test/pipeline/dummy_task_queue.h
+++ b/be/test/pipeline/dummy_task_queue.h
@@ -16,6 +16,7 @@
// under the License.
#include "pipeline/task_queue.h"
+#include "pipeline/task_scheduler.h"
namespace doris::pipeline {
@@ -47,4 +48,22 @@ class DummyTaskQueue final : public MultiCoreTaskQueue {
return task;
}
};
+
+class MockTaskScheduler : public TaskScheduler {
+public:
+ MockTaskScheduler() : TaskScheduler() { _task_queue =
std::make_unique<DummyTaskQueue>(1); }
+
+ Status submit(PipelineTaskSPtr task) override { return
_task_queue->push_back(task); }
+
+ Status start() override { return Status::OK(); }
+
+ void stop() override {}
+
+ std::vector<std::pair<std::string, std::vector<int>>> thread_debug_info()
override {
+ return {};
+ }
+
+private:
+ std::unique_ptr<DummyTaskQueue> _task_queue;
+};
} // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_task_test.cpp
b/be/test/pipeline/pipeline_task_test.cpp
index f5a08fd56e6..e885a377725 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -53,7 +53,8 @@ public:
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
true, fe_address,
QuerySource::INTERNAL_FRONTEND);
- _task_queue = std::make_unique<DummyTaskQueue>(1);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
}
void TearDown() override {
@@ -83,7 +84,7 @@ private:
TUniqueId _query_id = TUniqueId();
std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
TQueryOptions _query_options;
- std::unique_ptr<DummyTaskQueue> _task_queue;
+ std::unique_ptr<MockTaskScheduler> _task_scheduler;
const std::string LOCALHOST = BackendOptions::get_localhost();
const int DUMMY_PORT = config::brpc_port;
};
@@ -306,7 +307,6 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
- task->set_task_queue(_task_queue.get());
{
// `execute` should be called after `prepare`
bool done = false;
@@ -439,7 +439,6 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
@@ -504,7 +503,6 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
@@ -549,7 +547,6 @@ TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
@@ -630,7 +627,6 @@ TEST_F(PipelineTaskTest, TEST_SINK_EOF) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
@@ -669,7 +665,8 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
true, fe_address,
QuerySource::INTERNAL_FRONTEND);
- _task_queue = std::make_unique<DummyTaskQueue>(1);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
@@ -711,7 +708,6 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
@@ -802,7 +798,8 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
true, fe_address,
QuerySource::INTERNAL_FRONTEND);
- _task_queue = std::make_unique<DummyTaskQueue>(1);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
_build_fragment_context();
TWorkloadGroupInfo twg_info;
@@ -847,7 +844,6 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
- task->set_task_queue(_task_queue.get());
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
diff --git a/be/test/pipeline/pipeline_test.cpp
b/be/test/pipeline/pipeline_test.cpp
index 391d99231f3..b59d9f3b7d6 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -66,7 +66,8 @@ public:
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
TRuntimeFilterParamsBuilder().build());
ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
- _task_queue = std::make_unique<DummyTaskQueue>(1);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
}
void TearDown() override {}
@@ -110,6 +111,8 @@ private:
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
true, fe_address,
QuerySource::INTERNAL_FRONTEND);
+ _task_scheduler = std::make_unique<MockTaskScheduler>();
+ _query_ctx->_task_scheduler = _task_scheduler.get();
_runtime_state.clear();
_context.clear();
_fragment_id = 0;
@@ -135,7 +138,7 @@ private:
std::shared_ptr<QueryContext> _query_ctx;
TUniqueId _query_id = TUniqueId();
TQueryOptions _query_options;
- std::unique_ptr<DummyTaskQueue> _task_queue;
+ std::unique_ptr<MockTaskScheduler> _task_scheduler;
// Fragment level
// Fragment0 -> Fragment1
@@ -279,7 +282,6 @@ TEST_F(PipelineTest, HAPPY_PATH) {
cur_pipe, task_id, local_runtime_state.get(), _context.back(),
_pipeline_profiles[cur_pipe->id()].get(), shared_state_map,
task_id);
cur_pipe->incr_created_tasks(task_id, task.get());
- task->set_task_queue(_task_queue.get());
_pipeline_tasks[cur_pipe->id()].push_back(std::move(task));
_runtime_states[cur_pipe->id()].push_back(std::move(local_runtime_state));
}
@@ -329,7 +331,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), true);
_query_ctx->get_execution_dependency()->set_ready();
// Task is ready and be push into runnable task queue.
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!= nullptr,
+ true);
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->_wait_to_start(), false);
bool eos = false;
@@ -351,7 +354,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
EXPECT_EQ(block.columns(), 0);
// Task is ready since a new block reached.
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!= nullptr,
+ true);
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
@@ -379,7 +383,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
EXPECT_EQ(block.columns(), 0);
// Task is ready since a new block reached.
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!= nullptr,
+ true);
EXPECT_EQ(std::all_of(read_deps.cbegin(), read_deps.cend(),
[](const auto& dep) { return dep->ready(); }),
true);
@@ -448,7 +453,8 @@ TEST_F(PipelineTest, HAPPY_PATH) {
// Upstream task finished.
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!= nullptr,
+ true);
{
// Blocked by exchange read dependency due to no data reached.
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->execute(&eos),
Status::OK());
@@ -937,7 +943,6 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
_pipelines[i], task_id, local_runtime_state.get(),
_context.back(),
_pipeline_profiles[_pipelines[i]->id()].get(),
shared_state_map, j);
_pipelines[i]->incr_created_tasks(j, task.get());
- task->set_task_queue(_task_queue.get());
_pipeline_tasks[_pipelines[i]->id()].push_back(std::move(task));
_runtime_states[_pipelines[i]->id()].push_back(std::move(local_runtime_state));
}
@@ -993,7 +998,9 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
// Task is ready and be push into runnable task queue.
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!=
+ nullptr,
+ true);
}
}
for (size_t i = 0; i < _pipelines.size(); i++) {
@@ -1056,9 +1063,11 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
// Pipeline 0 is blocked by hash join dependency and is still waiting
for upstream tasks done.
for (int j = 0; j < parallelism; j++) {
// Task is ready and be push into runnable task queue.
- EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0)
!=
+ nullptr,
+ true);
}
- EXPECT_EQ(_task_queue->take(0), nullptr);
+
EXPECT_EQ(((MockTaskScheduler*)_query_ctx->_task_scheduler)->_task_queue->take(0),
nullptr);
for (int j = 0; j < parallelism; j++) {
EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]