This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77f187d9b47 [Refactor](exec) refactor the pipline code (#34402)
77f187d9b47 is described below

commit 77f187d9b474042d8286aa951dc68117de8b10dd
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed May 8 14:01:49 2024 +0800

    [Refactor](exec) refactor the pipline code (#34402)
---
 be/src/common/status.h             |  5 +----
 be/src/pipeline/exec/operator.cpp  |  6 +++--
 be/src/pipeline/exec/operator.h    | 10 ++++-----
 be/src/pipeline/pipeline_task.cpp  | 19 +++++++---------
 be/src/pipeline/pipeline_task.h    | 12 +++++-----
 be/src/pipeline/pipeline_tracing.h |  2 +-
 be/src/pipeline/task_queue.cpp     |  9 ++++----
 be/src/pipeline/task_queue.h       |  1 -
 be/src/pipeline/task_scheduler.cpp | 45 +++++++++++++-------------------------
 be/src/pipeline/task_scheduler.h   |  6 ++---
 10 files changed, 47 insertions(+), 68 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 835a371c844..c50412f31d6 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -560,10 +560,7 @@ inline std::string Status::to_string() const {
 }
 
 inline std::string Status::to_string_no_stack() const {
-    std::stringstream ss;
-    ss << '[' << code_as_string() << ']';
-    ss << msg();
-    return ss.str();
+    return fmt::format("[{}] {}", code_as_string(), msg());
 }
 
 // some generally useful macros
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index fadbf8c4677..45bad17ed01 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -418,7 +418,8 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, 
FakeSharedState>;
     if constexpr (!is_fake_shared) {
         if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedStateArg>) {
-            _shared_state = 
info.le_state_map[_parent->operator_id()].first.get();
+            DCHECK(info.le_state_map.find(_parent->operator_id()) != 
info.le_state_map.end());
+            _shared_state = 
info.le_state_map.at(_parent->operator_id()).first.get();
 
             _dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
@@ -500,7 +501,8 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     constexpr auto is_fake_shared = std::is_same_v<SharedState, 
FakeSharedState>;
     if constexpr (!is_fake_shared) {
         if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
-            _dependency = 
info.le_state_map[_parent->dests_id().front()].second.get();
+            DCHECK(info.le_state_map.find(_parent->dests_id().front()) != 
info.le_state_map.end());
+            _dependency = 
info.le_state_map.at(_parent->dests_id().front()).second.get();
             _shared_state = (SharedState*)_dependency->shared_state();
         } else {
             _shared_state = info.shared_state->template cast<SharedState>();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e4560c571c7..1b99b849d26 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -65,10 +65,10 @@ using DataSinkOperatorXPtr = 
std::shared_ptr<DataSinkOperatorXBase>;
 // This struct is used only for initializing local state.
 struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
-    const std::vector<TScanRangeParams> scan_ranges;
+    const std::vector<TScanRangeParams>& scan_ranges;
     BasicSharedState* shared_state;
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
+    const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
+                                  std::shared_ptr<Dependency>>>& le_state_map;
     const int task_idx;
 };
 
@@ -78,8 +78,8 @@ struct LocalSinkStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
     BasicSharedState* shared_state;
-    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
-            le_state_map;
+    const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
+                                  std::shared_ptr<Dependency>>>& le_state_map;
     const TDataSink& tsink;
 };
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 2c211835c55..b8aa61b74c4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -96,16 +96,15 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& 
local_params, const
         RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
     }
 
-    std::vector<TScanRangeParams> no_scan_ranges;
-    auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
-                                         _operators.front()->node_id(), 
no_scan_ranges);
+    _scan_ranges = find_with_default(local_params.per_node_scan_ranges,
+                                     _operators.front()->node_id(), 
_scan_ranges);
     auto* parent_profile = _state->get_sink_local_state()->profile();
     query_ctx->register_query_statistics(
             _state->get_sink_local_state()->get_query_statistics_ptr());
 
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
-        LocalStateInfo info {parent_profile, scan_ranges, 
get_op_shared_state(op->operator_id()),
+        LocalStateInfo info {parent_profile, _scan_ranges, 
get_op_shared_state(op->operator_id()),
                              _le_state_map, _task_idx};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
@@ -124,7 +123,7 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& 
local_params, const
 }
 
 Status PipelineTask::_extract_dependencies() {
-    for (auto op : _operators) {
+    for (auto& op : _operators) {
         auto result = _state->get_local_state_result(op->operator_id());
         if (!result) {
             return result.error();
@@ -152,12 +151,9 @@ Status PipelineTask::_extract_dependencies() {
 }
 
 void PipelineTask::_init_profile() {
-    std::stringstream ss;
-    ss << "PipelineTask"
-       << " (index=" << _index << ")";
-    auto* task_profile = new RuntimeProfile(ss.str());
-    _parent_profile->add_child(task_profile, true, nullptr);
-    _task_profile.reset(task_profile);
+    _task_profile =
+            std::make_unique<RuntimeProfile>(fmt::format("PipelineTask 
(index={})", _index));
+    _parent_profile->add_child(_task_profile.get(), true, nullptr);
     _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
 
     static const char* exec_time = "ExecuteTime";
@@ -344,6 +340,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
         return false;
     }
 }
+
 void PipelineTask::finalize() {
     std::unique_lock<std::mutex> lc(_release_lock);
     _finished = true;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 06c2cea3e2c..7be9593eeb2 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -138,13 +138,12 @@ public:
     }
 
     void set_previous_core_id(int id) {
-        if (id == _previous_schedule_id) {
-            return;
-        }
-        if (_previous_schedule_id != -1) {
-            COUNTER_UPDATE(_core_change_times, 1);
+        if (id != _previous_schedule_id) {
+            if (_previous_schedule_id != -1) {
+                COUNTER_UPDATE(_core_change_times, 1);
+            }
+            _previous_schedule_id = id;
         }
-        _previous_schedule_id = id;
     }
 
     void finalize();
@@ -381,6 +380,7 @@ private:
     // All shared states of this pipeline task.
     std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
     std::shared_ptr<BasicSharedState> _sink_shared_state;
+    std::vector<TScanRangeParams> _scan_ranges;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
             _le_state_map;
     int _task_idx;
diff --git a/be/src/pipeline/pipeline_tracing.h 
b/be/src/pipeline/pipeline_tracing.h
index 27dda6b85f1..aad0a7f9ee8 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -39,7 +39,7 @@ struct ScheduleRecord {
     uint64_t thread_id;
     uint64_t start_time;
     uint64_t end_time;
-    std::string state_name;
+    std::string_view state_name;
 
     bool operator<(const ScheduleRecord& rhs) const { return start_time < 
rhs.start_time; }
     std::string to_string(uint64_t append_value) const {
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index c54df42e1b1..24d71144240 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -19,14 +19,14 @@
 
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <memory>
 #include <string>
 
 #include "common/logging.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/workload_group/workload_group.h"
 
-namespace doris {
-namespace pipeline {
+namespace doris::pipeline {
 
 TaskQueue::~TaskQueue() = default;
 
@@ -139,7 +139,7 @@ int PriorityTaskQueue::task_size() {
 MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
 
 MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : 
TaskQueue(core_size), _closed(false) {
-    _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]);
+    _prio_task_queue_list = std::make_unique<PriorityTaskQueue[]>(core_size);
 }
 
 void MultiCoreTaskQueue::close() {
@@ -211,5 +211,4 @@ void MultiCoreTaskQueue::update_statistics(PipelineTask* 
task, int64_t time_spen
                                                                      
time_spent);
 }
 
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 481427eaf4b..74ed9187567 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -135,7 +135,6 @@ public:
     void close() override;
 
     // Get the task by core id.
-    // TODO: To think the logic is useful?
     PipelineTask* take(size_t core_id) override;
 
     // TODO combine these methods to `push_back(task, core_id = -1)`
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index ed88f19bae4..fbb67afdf46 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -37,8 +37,6 @@
 #include "pipeline_fragment_context.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
-#include "util/debug_util.h"
-#include "util/sse_util.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/time.h"
@@ -61,9 +59,8 @@ Status TaskScheduler::start() {
                             .set_max_queue_size(0)
                             .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
                             .build(&_fix_thread_pool));
-    _markers.reserve(cores);
+    _markers.resize(cores, true);
     for (size_t i = 0; i < cores; ++i) {
-        _markers.push_back(std::make_unique<std::atomic<bool>>(true));
         RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));
     }
     return Status::OK();
@@ -101,8 +98,7 @@ void _close_task(PipelineTask* task, PipelineTaskState 
state, Status exec_status
 }
 
 void TaskScheduler::_do_work(size_t index) {
-    const auto& marker = _markers[index];
-    while (*marker) {
+    while (_markers[index]) {
         auto* task = _task_queue->take(index);
         if (!task) {
             continue;
@@ -120,27 +116,20 @@ void TaskScheduler::_do_work(size_t index) {
         auto state = task->get_state();
         // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
         // has to return false. The task is finished and need to close now.
-        if (state == PipelineTaskState::PENDING_FINISH) {
-            Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
-            _close_task(task, canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
-                        exec_status);
-            continue;
-        }
-
-        DCHECK(state != PipelineTaskState::FINISHED && state != 
PipelineTaskState::CANCELED)
-                << "task already finish: " << task->debug_string();
-
-        if (canceled) {
+        if (state == PipelineTaskState::PENDING_FINISH || canceled) {
             // may change from pending FINISH,should called cancel
             // also may change form BLOCK, other task called cancel
 
             // If pipeline is canceled, it will report after pipeline closed, 
and will propagate
             // errors to downstream through exchange. So, here we needn't 
send_report.
             // fragment_ctx->send_report(true);
-            Status cancel_status = 
fragment_ctx->get_query_ctx()->exec_status();
-            _close_task(task, PipelineTaskState::CANCELED, cancel_status);
+            Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
+            _close_task(task, canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
+                        exec_status);
             continue;
         }
+        DCHECK(state != PipelineTaskState::FINISHED && state != 
PipelineTaskState::CANCELED)
+                << "task already finish: " << task->debug_string();
 
         task->set_state(PipelineTaskState::RUNNABLE);
 
@@ -150,7 +139,7 @@ void TaskScheduler::_do_work(size_t index) {
 
         try {
             // This will enable exception handling logic in allocator.h when 
memory allocate
-            // failed or sysem memory is not sufficient.
+            // failed or system memory is not sufficient.
             doris::enable_thread_catch_bad_alloc++;
             Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
             //TODO: use a better enclose to abstracting these
@@ -169,8 +158,7 @@ void TaskScheduler::_do_work(size_t index) {
                 status = task->execute(&eos);
 
                 uint64_t end_time = MonotonicMicros();
-                auto state = task->get_state();
-                std::string state_name = get_state_name(state);
+                std::string_view state_name = 
get_state_name(task->get_state());
                 ExecEnv::GetInstance()->pipeline_tracer_context()->record(
                         {query_id, task_name, core_id, thread_id, start_time, 
end_time,
                          state_name});
@@ -216,11 +204,8 @@ void TaskScheduler::_do_work(size_t index) {
                 task->set_state(PipelineTaskState::PENDING_FINISH);
                 task->set_running(false);
             } else {
-                // Close the task directly?
                 Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
-                _close_task(task,
-                            canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
-                            exec_status);
+                _close_task(task, PipelineTaskState::FINISHED, exec_status);
             }
             continue;
         }
@@ -243,13 +228,13 @@ void TaskScheduler::_do_work(size_t index) {
 }
 
 void TaskScheduler::stop() {
-    if (!this->_shutdown.load()) {
+    if (!_shutdown) {
         if (_task_queue) {
             _task_queue->close();
         }
         if (_fix_thread_pool) {
-            for (const auto& marker : _markers) {
-                marker->store(false);
+            for (size_t i = 0; i < _markers.size(); ++i) {
+                _markers[i] = false;
             }
             _fix_thread_pool->shutdown();
             _fix_thread_pool->wait();
@@ -258,7 +243,7 @@ void TaskScheduler::stop() {
         // pool is stopped. For example, if there are 2 threads call stop
         // then if one thread set shutdown = false, then another thread will
         // not check it and will free task scheduler.
-        this->_shutdown.store(true);
+        _shutdown = true;
     }
 }
 
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 8e513748203..1a9b9ad17db 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -50,7 +50,7 @@ public:
                   CgroupCpuCtl* cgroup_cpu_ctl)
             : _task_queue(std::move(task_queue)),
               _shutdown(false),
-              _name(name),
+              _name(std::move(name)),
               _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
 
     ~TaskScheduler();
@@ -66,8 +66,8 @@ public:
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
-    std::vector<std::unique_ptr<std::atomic<bool>>> _markers;
-    std::atomic<bool> _shutdown;
+    std::vector<bool> _markers;
+    bool _shutdown;
     std::string _name;
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to