This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db1da6b787 [Chore](pipeline) add some profile log when pipeline 
canceled (#20825)
db1da6b787 is described below

commit db1da6b7874cf86c00246fb30f17446c2dce6de5
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Jun 16 10:54:54 2023 +0800

    [Chore](pipeline) add some profile log when pipeline canceled (#20825)
    
    add some profile log when pipeline canceled
---
 be/src/pipeline/exec/data_queue.cpp                | 10 +++++----
 be/src/pipeline/exec/empty_source_operator.h       |  4 ++++
 .../exec/multi_cast_data_stream_source.cpp         |  4 ++++
 .../pipeline/exec/multi_cast_data_stream_source.h  |  2 ++
 be/src/pipeline/exec/operator.cpp                  |  2 +-
 be/src/pipeline/exec/operator.h                    |  8 ++++++++
 be/src/pipeline/pipeline_task.cpp                  | 24 ++++++++++++++++++----
 be/src/pipeline/pipeline_task.h                    |  1 +
 8 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 6ec6a5b2b1..bdcc12df95 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -83,13 +83,15 @@ bool DataQueue::has_data_or_finished(int child_idx) {
 //so next loop, will check the record idx + 1 first
 //maybe it's useful with many queue, others maybe always 0
 bool DataQueue::remaining_has_data() {
-    int count = _child_count - 1;
-    while (count >= 0) {
-        _flag_queue_idx = (_flag_queue_idx + 1) % _child_count;
+    int count = _child_count;
+    while (--count >= 0) {
+        _flag_queue_idx++;
+        if (_flag_queue_idx == _child_count) {
+            _flag_queue_idx = 0;
+        }
         if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
             return true;
         }
-        count--;
     }
     return false;
 }
diff --git a/be/src/pipeline/exec/empty_source_operator.h 
b/be/src/pipeline/exec/empty_source_operator.h
index 4d93a310df..acd2c8cdfc 100644
--- a/be/src/pipeline/exec/empty_source_operator.h
+++ b/be/src/pipeline/exec/empty_source_operator.h
@@ -78,6 +78,10 @@ public:
         return Status::OK();
     }
 
+    [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
+        return _exec_node->runtime_profile();
+    }
+
 private:
     ExecNode* _exec_node = nullptr;
 };
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 9854d63120..06211faf52 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -67,4 +67,8 @@ Status 
MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
     return OperatorBase::close(state);
 }
 
+RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() 
const {
+    return _multi_cast_data_streamer->profile();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 3198c4a408..15bd320b89 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -71,6 +71,8 @@ public:
 
     Status close(doris::RuntimeState* state) override;
 
+    [[nodiscard]] RuntimeProfile* get_runtime_profile() const override;
+
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 40da74ffb0..765f3421b8 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -49,7 +49,7 @@ const RowDescriptor& OperatorBase::row_desc() {
 
 std::string OperatorBase::debug_string() const {
     std::stringstream ss;
-    ss << _operator_builder->get_name() << ", is_source: " << is_source();
+    ss << _operator_builder->get_name() << ": is_source: " << is_source();
     ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed;
     ss << ", is_pending_finish: " << is_pending_finish();
     return ss.str();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0a31435b8f..fcd22eedcb 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -242,6 +242,8 @@ public:
     virtual std::string debug_string() const;
     int32_t id() const { return _operator_builder->id(); }
 
+    [[nodiscard]] virtual RuntimeProfile* get_runtime_profile() const = 0;
+
 protected:
     OperatorBuilderBase* _operator_builder;
     OperatorPtr _child;
@@ -293,6 +295,8 @@ public:
 
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
+    [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
+
 protected:
     NodeType* _sink;
 };
@@ -356,6 +360,10 @@ public:
 
     bool can_read() override { return _node->can_read(); }
 
+    [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
+        return _node->runtime_profile();
+    }
+
 protected:
     NodeType* _node;
     bool _use_projection;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 7c3cc2abe7..9c8c23ec7e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -32,6 +32,7 @@
 #include "runtime/thread_context.h"
 #include "task_queue.h"
 #include "util/defer_op.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 class RuntimeState;
@@ -64,6 +65,7 @@ void PipelineTask::_init_profile() {
     _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
     _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
     _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", 
exec_time);
+    _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", 
TUnit::UNIT);
     _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
     _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", 
exec_time);
     _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
@@ -213,6 +215,7 @@ Status PipelineTask::execute(bool* eos) {
         // Pull block from operator chain
         {
             SCOPED_TIMER(_get_block_timer);
+            _get_block_counter->update(1);
             RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
         }
         *eos = _data_state == SourceState::FINISHED;
@@ -313,23 +316,36 @@ void PipelineTask::set_state(PipelineTaskState state) {
 
 std::string PipelineTask::debug_string() {
     fmt::memory_buffer debug_string_buffer;
-    std::stringstream profile_ss;
-    _fresh_profile_counter();
-    _task_profile->pretty_print(&profile_ss, "");
 
     fmt::format_to(debug_string_buffer, "QueryId: {}\n", 
print_id(query_context()->query_id));
     fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                    print_id(fragment_context()->get_fragment_instance_id()));
 
-    fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
+    fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
+                   PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
+    {
+        std::stringstream profile_ss;
+        _fresh_profile_counter();
+        _task_profile->pretty_print(&profile_ss, "");
+        fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
+    }
     fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = 
{}]\noperators: ", _index,
                    get_state_name(_cur_state));
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
                        _operators[i]->debug_string());
+        std::stringstream profile_ss;
+        _operators[i]->get_runtime_profile()->pretty_print(&profile_ss, 
std::string(i * 2, ' '));
+        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
     }
     fmt::format_to(debug_string_buffer, "\n{}{}", 
std::string(_operators.size() * 2, ' '),
                    _sink->debug_string());
+    {
+        std::stringstream profile_ss;
+        _sink->get_runtime_profile()->pretty_print(&profile_ss,
+                                                   
std::string(_operators.size() * 2, ' '));
+        fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9317a494da..65e9ad83ed 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -257,6 +257,7 @@ private:
     RuntimeProfile::Counter* _open_timer;
     RuntimeProfile::Counter* _exec_timer;
     RuntimeProfile::Counter* _get_block_timer;
+    RuntimeProfile::Counter* _get_block_counter;
     RuntimeProfile::Counter* _sink_timer;
     RuntimeProfile::Counter* _finalize_timer;
     RuntimeProfile::Counter* _close_timer;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to