This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 77f187d9b47 [Refactor](exec) refactor the pipline code (#34402) 77f187d9b47 is described below commit 77f187d9b474042d8286aa951dc68117de8b10dd Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed May 8 14:01:49 2024 +0800 [Refactor](exec) refactor the pipline code (#34402) --- be/src/common/status.h | 5 +---- be/src/pipeline/exec/operator.cpp | 6 +++-- be/src/pipeline/exec/operator.h | 10 ++++----- be/src/pipeline/pipeline_task.cpp | 19 +++++++--------- be/src/pipeline/pipeline_task.h | 12 +++++----- be/src/pipeline/pipeline_tracing.h | 2 +- be/src/pipeline/task_queue.cpp | 9 ++++---- be/src/pipeline/task_queue.h | 1 - be/src/pipeline/task_scheduler.cpp | 45 +++++++++++++------------------------- be/src/pipeline/task_scheduler.h | 6 ++--- 10 files changed, 47 insertions(+), 68 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 835a371c844..c50412f31d6 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -560,10 +560,7 @@ inline std::string Status::to_string() const { } inline std::string Status::to_string_no_stack() const { - std::stringstream ss; - ss << '[' << code_as_string() << ']'; - ss << msg(); - return ss.str(); + return fmt::format("[{}] {}", code_as_string(), msg()); } // some generally useful macros diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index fadbf8c4677..45bad17ed01 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -418,7 +418,8 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>; if constexpr (!is_fake_shared) { if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { - _shared_state = info.le_state_map[_parent->operator_id()].first.get(); + DCHECK(info.le_state_map.find(_parent->operator_id()) != info.le_state_map.end()); + _shared_state = info.le_state_map.at(_parent->operator_id()).first.get(); _dependency = _shared_state->get_dep_by_channel_id(info.task_idx); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( @@ -500,7 +501,8 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>; if constexpr (!is_fake_shared) { if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { - _dependency = info.le_state_map[_parent->dests_id().front()].second.get(); + DCHECK(info.le_state_map.find(_parent->dests_id().front()) != info.le_state_map.end()); + _dependency = info.le_state_map.at(_parent->dests_id().front()).second.get(); _shared_state = (SharedState*)_dependency->shared_state(); } else { _shared_state = info.shared_state->template cast<SharedState>(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e4560c571c7..1b99b849d26 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -65,10 +65,10 @@ using DataSinkOperatorXPtr = std::shared_ptr<DataSinkOperatorXBase>; // This struct is used only for initializing local state. struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; - const std::vector<TScanRangeParams> scan_ranges; + const std::vector<TScanRangeParams>& scan_ranges; BasicSharedState* shared_state; - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, + std::shared_ptr<Dependency>>>& le_state_map; const int task_idx; }; @@ -78,8 +78,8 @@ struct LocalSinkStateInfo { RuntimeProfile* parent_profile = nullptr; const int sender_id; BasicSharedState* shared_state; - std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> - le_state_map; + const std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, + std::shared_ptr<Dependency>>>& le_state_map; const TDataSink& tsink; }; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 2c211835c55..b8aa61b74c4 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -96,16 +96,15 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); } - std::vector<TScanRangeParams> no_scan_ranges; - auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, - _operators.front()->node_id(), no_scan_ranges); + _scan_ranges = find_with_default(local_params.per_node_scan_ranges, + _operators.front()->node_id(), _scan_ranges); auto* parent_profile = _state->get_sink_local_state()->profile(); query_ctx->register_query_statistics( _state->get_sink_local_state()->get_query_statistics_ptr()); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; - LocalStateInfo info {parent_profile, scan_ranges, get_op_shared_state(op->operator_id()), + LocalStateInfo info {parent_profile, _scan_ranges, get_op_shared_state(op->operator_id()), _le_state_map, _task_idx}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); @@ -124,7 +123,7 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const } Status PipelineTask::_extract_dependencies() { - for (auto op : _operators) { + for (auto& op : _operators) { auto result = _state->get_local_state_result(op->operator_id()); if (!result) { return result.error(); @@ -152,12 +151,9 @@ Status PipelineTask::_extract_dependencies() { } void PipelineTask::_init_profile() { - std::stringstream ss; - ss << "PipelineTask" - << " (index=" << _index << ")"; - auto* task_profile = new RuntimeProfile(ss.str()); - _parent_profile->add_child(task_profile, true, nullptr); - _task_profile.reset(task_profile); + _task_profile = + std::make_unique<RuntimeProfile>(fmt::format("PipelineTask (index={})", _index)); + _parent_profile->add_child(_task_profile.get(), true, nullptr); _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime"); static const char* exec_time = "ExecuteTime"; @@ -344,6 +340,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m return false; } } + void PipelineTask::finalize() { std::unique_lock<std::mutex> lc(_release_lock); _finished = true; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 06c2cea3e2c..7be9593eeb2 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -138,13 +138,12 @@ public: } void set_previous_core_id(int id) { - if (id == _previous_schedule_id) { - return; - } - if (_previous_schedule_id != -1) { - COUNTER_UPDATE(_core_change_times, 1); + if (id != _previous_schedule_id) { + if (_previous_schedule_id != -1) { + COUNTER_UPDATE(_core_change_times, 1); + } + _previous_schedule_id = id; } - _previous_schedule_id = id; } void finalize(); @@ -381,6 +380,7 @@ private: // All shared states of this pipeline task. std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states; std::shared_ptr<BasicSharedState> _sink_shared_state; + std::vector<TScanRangeParams> _scan_ranges; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> _le_state_map; int _task_idx; diff --git a/be/src/pipeline/pipeline_tracing.h b/be/src/pipeline/pipeline_tracing.h index 27dda6b85f1..aad0a7f9ee8 100644 --- a/be/src/pipeline/pipeline_tracing.h +++ b/be/src/pipeline/pipeline_tracing.h @@ -39,7 +39,7 @@ struct ScheduleRecord { uint64_t thread_id; uint64_t start_time; uint64_t end_time; - std::string state_name; + std::string_view state_name; bool operator<(const ScheduleRecord& rhs) const { return start_time < rhs.start_time; } std::string to_string(uint64_t append_value) const { diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index c54df42e1b1..24d71144240 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -19,14 +19,14 @@ // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep +#include <memory> #include <string> #include "common/logging.h" #include "pipeline/pipeline_task.h" #include "runtime/workload_group/workload_group.h" -namespace doris { -namespace pipeline { +namespace doris::pipeline { TaskQueue::~TaskQueue() = default; @@ -139,7 +139,7 @@ int PriorityTaskQueue::task_size() { MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]); + _prio_task_queue_list = std::make_unique<PriorityTaskQueue[]>(core_size); } void MultiCoreTaskQueue::close() { @@ -211,5 +211,4 @@ void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen time_spent); } -} // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 481427eaf4b..74ed9187567 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -135,7 +135,6 @@ public: void close() override; // Get the task by core id. - // TODO: To think the logic is useful? PipelineTask* take(size_t core_id) override; // TODO combine these methods to `push_back(task, core_id = -1)` diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index ed88f19bae4..fbb67afdf46 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -37,8 +37,6 @@ #include "pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" -#include "util/debug_util.h" -#include "util/sse_util.hpp" #include "util/thread.h" #include "util/threadpool.h" #include "util/time.h" @@ -61,9 +59,8 @@ Status TaskScheduler::start() { .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); - _markers.reserve(cores); + _markers.resize(cores, true); for (size_t i = 0; i < cores; ++i) { - _markers.push_back(std::make_unique<std::atomic<bool>>(true)); RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } return Status::OK(); @@ -101,8 +98,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status } void TaskScheduler::_do_work(size_t index) { - const auto& marker = _markers[index]; - while (*marker) { + while (_markers[index]) { auto* task = _task_queue->take(index); if (!task) { continue; @@ -120,27 +116,20 @@ void TaskScheduler::_do_work(size_t index) { auto state = task->get_state(); // If the state is PENDING_FINISH, then the task is come from blocked queue, its is_pending_finish // has to return false. The task is finished and need to close now. - if (state == PipelineTaskState::PENDING_FINISH) { - Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, - exec_status); - continue; - } - - DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED) - << "task already finish: " << task->debug_string(); - - if (canceled) { + if (state == PipelineTaskState::PENDING_FINISH || canceled) { // may change from pending FINISH,should called cancel // also may change form BLOCK, other task called cancel // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, PipelineTaskState::CANCELED, cancel_status); + Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); + _close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, + exec_status); continue; } + DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED) + << "task already finish: " << task->debug_string(); task->set_state(PipelineTaskState::RUNNABLE); @@ -150,7 +139,7 @@ void TaskScheduler::_do_work(size_t index) { try { // This will enable exception handling logic in allocator.h when memory allocate - // failed or sysem memory is not sufficient. + // failed or system memory is not sufficient. doris::enable_thread_catch_bad_alloc++; Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; //TODO: use a better enclose to abstracting these @@ -169,8 +158,7 @@ void TaskScheduler::_do_work(size_t index) { status = task->execute(&eos); uint64_t end_time = MonotonicMicros(); - auto state = task->get_state(); - std::string state_name = get_state_name(state); + std::string_view state_name = get_state_name(task->get_state()); ExecEnv::GetInstance()->pipeline_tracer_context()->record( {query_id, task_name, core_id, thread_id, start_time, end_time, state_name}); @@ -216,11 +204,8 @@ void TaskScheduler::_do_work(size_t index) { task->set_state(PipelineTaskState::PENDING_FINISH); task->set_running(false); } else { - // Close the task directly? Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, - canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, - exec_status); + _close_task(task, PipelineTaskState::FINISHED, exec_status); } continue; } @@ -243,13 +228,13 @@ void TaskScheduler::_do_work(size_t index) { } void TaskScheduler::stop() { - if (!this->_shutdown.load()) { + if (!_shutdown) { if (_task_queue) { _task_queue->close(); } if (_fix_thread_pool) { - for (const auto& marker : _markers) { - marker->store(false); + for (size_t i = 0; i < _markers.size(); ++i) { + _markers[i] = false; } _fix_thread_pool->shutdown(); _fix_thread_pool->wait(); @@ -258,7 +243,7 @@ void TaskScheduler::stop() { // pool is stopped. For example, if there are 2 threads call stop // then if one thread set shutdown = false, then another thread will // not check it and will free task scheduler. - this->_shutdown.store(true); + _shutdown = true; } } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 8e513748203..1a9b9ad17db 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -50,7 +50,7 @@ public: CgroupCpuCtl* cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), _shutdown(false), - _name(name), + _name(std::move(name)), _cgroup_cpu_ctl(cgroup_cpu_ctl) {} ~TaskScheduler(); @@ -66,8 +66,8 @@ public: private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; - std::vector<std::unique_ptr<std::atomic<bool>>> _markers; - std::atomic<bool> _shutdown; + std::vector<bool> _markers; + bool _shutdown; std::string _name; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org