This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e6644b1eea [pipelineX](profile) Improve exchange sink profile (#26117)
7e6644b1eea is described below

commit 7e6644b1eea088b62ef59cf5b87cadd354b3e428
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Oct 31 14:10:42 2023 +0800

    [pipelineX](profile) Improve exchange sink profile (#26117)
---
 be/src/pipeline/exec/exchange_source_operator.cpp |  2 +-
 be/src/pipeline/exec/exchange_source_operator.h   | 17 +++--------------
 be/src/pipeline/exec/hashjoin_build_sink.h        |  1 -
 be/src/pipeline/exec/repeat_operator.cpp          |  1 -
 be/src/pipeline/exec/repeat_operator.h            |  1 +
 be/src/pipeline/pipeline_x/operator.cpp           |  8 ++++++--
 6 files changed, 11 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 362853fa18e..86d7c3728d9 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -64,7 +64,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         static const std::string timer_name =
                 "WaitForDependency[" + source_dependency->name() + "]Time";
         _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
-        metrics[i] = ADD_CHILD_TIMER(_runtime_profile, "WaitForData", 
timer_name);
+        metrics[i] = ADD_CHILD_TIMER(_runtime_profile, 
fmt::format("WaitForData{}", i), timer_name);
     }
     
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
             state, vsort_exec_exprs));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 12c6c38e4bc..c41268f8eac 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -54,16 +54,8 @@ struct ExchangeDataDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
     ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
-            : Dependency(id, "DataDependency"), _sender_queue(sender_queue), 
_always_done(false) {}
+            : Dependency(id, "DataDependency"), _always_done(false) {}
     void* shared_state() override { return nullptr; }
-    [[nodiscard]] Dependency* read_blocked_by() override {
-        if (config::enable_fuzzy_mode && _sender_queue->should_wait() &&
-            _read_dependency_watcher.elapsed_time() > 
SLOW_DEPENDENCY_THRESHOLD) {
-            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
-                         << id();
-        }
-        return _sender_queue->should_wait() ? this : nullptr;
-    }
 
     void set_always_done() {
         _always_done = true;
@@ -74,17 +66,14 @@ public:
         _ready_for_read = true;
     }
 
-    void set_ready_for_read() override {
-        if (_always_done || !_ready_for_read) {
+    void block_reading() override {
+        if (_always_done) {
             return;
         }
         _ready_for_read = false;
-        // ScannerContext is set done outside this function now and only stop 
watcher here.
-        _read_dependency_watcher.start();
     }
 
 private:
-    vectorized::VDataStreamRecvr::SenderQueue* _sender_queue;
     std::atomic<bool> _always_done;
 };
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 8ba6e2fba3b..10056a30e72 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -68,7 +68,6 @@ public:
     Status process_build_block(RuntimeState* state, vectorized::Block& block, 
uint8_t offset);
 
     void init_short_circuit_for_probe();
-    HashJoinBuildSinkOperatorX* join_build() { return 
(HashJoinBuildSinkOperatorX*)_parent; }
 
     bool build_unique() const;
     std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index c9e0a38ec4c..ce00746380d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -211,7 +211,6 @@ Status RepeatOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block
 Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* 
output_block,
                              SourceState& source_state) const {
     auto& local_state = get_local_state(state);
-    SCOPED_TIMER(local_state.profile()->total_time_counter());
     auto& _repeat_id_idx = local_state._repeat_id_idx;
     auto& _child_block = *local_state._child_block;
     auto& _child_eos = local_state._child_eos;
diff --git a/be/src/pipeline/exec/repeat_operator.h 
b/be/src/pipeline/exec/repeat_operator.h
index f6c52a0be8d..18d373b77df 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -70,6 +70,7 @@ private:
     std::unique_ptr<vectorized::Block> _intermediate_block {};
     vectorized::VExprContextSPtrs _expr_ctxs;
 };
+
 class RepeatOperatorX final : public StatefulOperatorX<RepeatLocalState> {
 public:
     using Base = StatefulOperatorX<RepeatLocalState>;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index c29e1b8af87..cc22c96debd 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -447,11 +447,15 @@ Status 
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
             local_state._child_source_state != SourceState::FINISHED) {
             return Status::OK();
         }
-        RETURN_IF_ERROR(
-                push(state, local_state._child_block.get(), 
local_state._child_source_state));
+        {
+            SCOPED_TIMER(local_state.profile()->total_time_counter());
+            RETURN_IF_ERROR(
+                    push(state, local_state._child_block.get(), 
local_state._child_source_state));
+        }
     }
 
     if (!need_more_input_data(state)) {
+        SCOPED_TIMER(local_state.profile()->total_time_counter());
         SourceState new_state = SourceState::DEPEND_ON_SOURCE;
         RETURN_IF_ERROR(pull(state, block, new_state));
         if (new_state == SourceState::FINISHED) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to