This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3e819ab64b0 [minor](task) Complete debug string if task is finalized (#49950) 3e819ab64b0 is described below commit 3e819ab64b00a52ce622aa315f6fc81fe92f692e Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Fri Apr 11 17:29:18 2025 +0800 [minor](task) Complete debug string if task is finalized (#49950) 1. Add spill dependencies to run BE UT. 2. Add pipeline name in debug string if task is finalized. --- be/src/pipeline/exec/operator.h | 6 +++++ be/src/pipeline/pipeline.h | 1 + be/src/pipeline/pipeline_task.cpp | 43 +++++++++++++++------------------ be/src/pipeline/pipeline_task.h | 1 + be/test/pipeline/pipeline_task_test.cpp | 4 +-- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0625bcc0ad0..75a767aaa83 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -1100,12 +1100,15 @@ public: "DummyOperatorDependency", true); _filter_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "DummyOperatorDependency", true); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); } Dependency* finishdependency() override { return _finish_dependency.get(); } ~DummyOperatorLocalState() = default; std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } std::vector<Dependency*> filter_dependencies() override { return {_filter_dependency.get()}; } + Dependency* spill_dependency() const override { return _spill_dependency.get(); } private: std::shared_ptr<Dependency> _tmp_dependency; @@ -1145,10 +1148,13 @@ public: "DummyOperatorDependency", true); _finish_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "DummyOperatorDependency", true); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); } std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } Dependency* finishdependency() override { return _finish_dependency.get(); } + Dependency* spill_dependency() const override { return _spill_dependency.get(); } bool is_finished() const override { return _is_finished; } private: diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 7bde9323e94..7fff24cf8d9 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -138,6 +138,7 @@ public: } int num_tasks_of_parent() const { return _num_tasks_of_parent; } + std::string& name() { return _name; } private: void _init_profile(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 94c5c6f7c75..852174137de 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -79,8 +79,8 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState _shared_state_map(std::move(shared_state_map)), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()), - _memory_sufficient_dependency( - state->get_query_ctx()->get_memory_sufficient_dependency()) { + _memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()), + _pipeline_name(_pipeline->name()) { _pipeline_task_watcher.start(); if (!_shared_state_map.contains(_sink->dests_id().front())) { @@ -291,24 +291,19 @@ void PipelineTask::terminate() { auto fragment = _fragment_context.lock(); if (!is_finalized() && fragment) { DCHECK(_wake_up_early || fragment->is_canceled()); - for (auto* dep : _spill_dependencies) { - dep->set_always_ready(); - } - - for (auto* dep : _filter_dependencies) { - dep->set_always_ready(); - } - for (auto& deps : _read_dependencies) { - for (auto* dep : deps) { - dep->set_always_ready(); - } - } - for (auto* dep : _write_dependencies) { - dep->set_always_ready(); - } - for (auto* dep : _finish_dependencies) { - dep->set_always_ready(); - } + std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(), + [&](Dependency* dep) { dep->set_always_ready(); }); + std::for_each(_filter_dependencies.begin(), _filter_dependencies.end(), + [&](Dependency* dep) { dep->set_always_ready(); }); + std::for_each(_write_dependencies.begin(), _write_dependencies.end(), + [&](Dependency* dep) { dep->set_always_ready(); }); + std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(), + [&](Dependency* dep) { dep->set_always_ready(); }); + std::for_each(_read_dependencies.begin(), _read_dependencies.end(), + [&](std::vector<Dependency*>& deps) { + std::for_each(deps.begin(), deps.end(), + [&](Dependency* dep) { dep->set_always_ready(); }); + }); _execution_dep->set_ready(); _memory_sufficient_dependency->set_ready(); } @@ -696,16 +691,16 @@ std::string PipelineTask::debug_string() { print_id(_state->fragment_instance_id())); fmt::format_to(debug_string_buffer, - "PipelineTask[this = {}, id = {}, open = {}, eos = {}, state = {}, dry run = " + "PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry run = " "{}, _wake_up_early = {}, time elapsed since last state changing = {}s, spilling" " = {}, is running = {}]", - (void*)this, _index, _opened, _eos, _to_string(_exec_state), _dry_run, - _wake_up_early.load(), _state_change_watcher.elapsed_time() / NANOS_PER_SEC, - _spilling, is_running()); + _index, _opened, _eos, _to_string(_exec_state), _dry_run, _wake_up_early.load(), + _state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling, is_running()); std::unique_lock<std::mutex> lc(_dependency_lock); auto* cur_blocked_dep = _blocked_dep; auto fragment = _fragment_context.lock(); if (is_finalized() || !fragment) { + fmt::format_to(debug_string_buffer, " pipeline name = {}", _pipeline_name); return fmt::to_string(debug_string_buffer); } auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 4dbaa58feec..36a85f7321e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -336,6 +336,7 @@ private: std::atomic<State> _exec_state = State::INITED; MonotonicStopWatch _state_change_watcher; std::atomic<bool> _spilling = false; + const std::string _pipeline_name; }; using PipelineTaskSPtr = std::shared_ptr<PipelineTask>; diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index f5a68d2da1b..3e16f21568a 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -263,7 +263,7 @@ TEST_F(PipelineTaskTest, TEST_OPEN) { EXPECT_FALSE(task->_read_dependencies.empty()); EXPECT_FALSE(task->_write_dependencies.empty()); EXPECT_FALSE(task->_finish_dependencies.empty()); - EXPECT_TRUE(task->_spill_dependencies.empty()); + EXPECT_FALSE(task->_spill_dependencies.empty()); EXPECT_TRUE(task->_opened); } } @@ -360,7 +360,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) { EXPECT_FALSE(task->_read_dependencies.empty()); EXPECT_FALSE(task->_write_dependencies.empty()); EXPECT_FALSE(task->_finish_dependencies.empty()); - EXPECT_TRUE(task->_spill_dependencies.empty()); + EXPECT_FALSE(task->_spill_dependencies.empty()); EXPECT_TRUE(task->_opened); EXPECT_FALSE(read_dep->ready()); EXPECT_TRUE(write_dep->ready()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org