This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new db1da6b787 [Chore](pipeline) add some profile log when pipeline canceled (#20825) db1da6b787 is described below commit db1da6b7874cf86c00246fb30f17446c2dce6de5 Author: Pxl <pxl...@qq.com> AuthorDate: Fri Jun 16 10:54:54 2023 +0800 [Chore](pipeline) add some profile log when pipeline canceled (#20825) add some profile log when pipeline canceled --- be/src/pipeline/exec/data_queue.cpp | 10 +++++---- be/src/pipeline/exec/empty_source_operator.h | 4 ++++ .../exec/multi_cast_data_stream_source.cpp | 4 ++++ .../pipeline/exec/multi_cast_data_stream_source.h | 2 ++ be/src/pipeline/exec/operator.cpp | 2 +- be/src/pipeline/exec/operator.h | 8 ++++++++ be/src/pipeline/pipeline_task.cpp | 24 ++++++++++++++++++---- be/src/pipeline/pipeline_task.h | 1 + 8 files changed, 46 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 6ec6a5b2b1..bdcc12df95 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -83,13 +83,15 @@ bool DataQueue::has_data_or_finished(int child_idx) { //so next loop, will check the record idx + 1 first //maybe it's useful with many queue, others maybe always 0 bool DataQueue::remaining_has_data() { - int count = _child_count - 1; - while (count >= 0) { - _flag_queue_idx = (_flag_queue_idx + 1) % _child_count; + int count = _child_count; + while (--count >= 0) { + _flag_queue_idx++; + if (_flag_queue_idx == _child_count) { + _flag_queue_idx = 0; + } if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) { return true; } - count--; } return false; } diff --git a/be/src/pipeline/exec/empty_source_operator.h b/be/src/pipeline/exec/empty_source_operator.h index 4d93a310df..acd2c8cdfc 100644 --- a/be/src/pipeline/exec/empty_source_operator.h +++ b/be/src/pipeline/exec/empty_source_operator.h @@ -78,6 +78,10 @@ public: return Status::OK(); } + [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { + return _exec_node->runtime_profile(); + } + private: ExecNode* _exec_node = nullptr; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 9854d63120..06211faf52 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -67,4 +67,8 @@ Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) { return OperatorBase::close(state); } +RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const { + return _multi_cast_data_streamer->profile(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 3198c4a408..15bd320b89 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -71,6 +71,8 @@ public: Status close(doris::RuntimeState* state) override; + [[nodiscard]] RuntimeProfile* get_runtime_profile() const override; + private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 40da74ffb0..765f3421b8 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -49,7 +49,7 @@ const RowDescriptor& OperatorBase::row_desc() { std::string OperatorBase::debug_string() const { std::stringstream ss; - ss << _operator_builder->get_name() << ", is_source: " << is_source(); + ss << _operator_builder->get_name() << ": is_source: " << is_source(); ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed; ss << ", is_pending_finish: " << is_pending_finish(); return ss.str(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0a31435b8f..fcd22eedcb 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -242,6 +242,8 @@ public: virtual std::string debug_string() const; int32_t id() const { return _operator_builder->id(); } + [[nodiscard]] virtual RuntimeProfile* get_runtime_profile() const = 0; + protected: OperatorBuilderBase* _operator_builder; OperatorPtr _child; @@ -293,6 +295,8 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } + [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } + protected: NodeType* _sink; }; @@ -356,6 +360,10 @@ public: bool can_read() override { return _node->can_read(); } + [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { + return _node->runtime_profile(); + } + protected: NodeType* _node; bool _use_projection; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 7c3cc2abe7..9c8c23ec7e 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -32,6 +32,7 @@ #include "runtime/thread_context.h" #include "task_queue.h" #include "util/defer_op.h" +#include "util/runtime_profile.h" namespace doris { class RuntimeState; @@ -64,6 +65,7 @@ void PipelineTask::_init_profile() { _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time); _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time); _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time); + _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", TUnit::UNIT); _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time); _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time); _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); @@ -213,6 +215,7 @@ Status PipelineTask::execute(bool* eos) { // Pull block from operator chain { SCOPED_TIMER(_get_block_timer); + _get_block_counter->update(1); RETURN_IF_ERROR(_root->get_block(_state, block, _data_state)); } *eos = _data_state == SourceState::FINISHED; @@ -313,23 +316,36 @@ void PipelineTask::set_state(PipelineTaskState state) { std::string PipelineTask::debug_string() { fmt::memory_buffer debug_string_buffer; - std::stringstream profile_ss; - _fresh_profile_counter(); - _task_profile->pretty_print(&profile_ss, ""); fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id)); fmt::format_to(debug_string_buffer, "InstanceId: {}\n", print_id(fragment_context()->get_fragment_instance_id())); - fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); + fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n", + PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS)); + { + std::stringstream profile_ss; + _fresh_profile_counter(); + _task_profile->pretty_print(&profile_ss, ""); + fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); + } fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index, get_state_name(_cur_state)); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '), _operators[i]->debug_string()); + std::stringstream profile_ss; + _operators[i]->get_runtime_profile()->pretty_print(&profile_ss, std::string(i * 2, ' ')); + fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str()); } fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '), _sink->debug_string()); + { + std::stringstream profile_ss; + _sink->get_runtime_profile()->pretty_print(&profile_ss, + std::string(_operators.size() * 2, ' ')); + fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str()); + } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 9317a494da..65e9ad83ed 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -257,6 +257,7 @@ private: RuntimeProfile::Counter* _open_timer; RuntimeProfile::Counter* _exec_timer; RuntimeProfile::Counter* _get_block_timer; + RuntimeProfile::Counter* _get_block_counter; RuntimeProfile::Counter* _sink_timer; RuntimeProfile::Counter* _finalize_timer; RuntimeProfile::Counter* _close_timer; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org