This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7e6644b1eea [pipelineX](profile) Improve exchange sink profile (#26117) 7e6644b1eea is described below commit 7e6644b1eea088b62ef59cf5b87cadd354b3e428 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Oct 31 14:10:42 2023 +0800 [pipelineX](profile) Improve exchange sink profile (#26117) --- be/src/pipeline/exec/exchange_source_operator.cpp | 2 +- be/src/pipeline/exec/exchange_source_operator.h | 17 +++-------------- be/src/pipeline/exec/hashjoin_build_sink.h | 1 - be/src/pipeline/exec/repeat_operator.cpp | 1 - be/src/pipeline/exec/repeat_operator.h | 1 + be/src/pipeline/pipeline_x/operator.cpp | 8 ++++++-- 6 files changed, 11 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 362853fa18e..86d7c3728d9 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -64,7 +64,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { static const std::string timer_name = "WaitForDependency[" + source_dependency->name() + "]Time"; _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name); - metrics[i] = ADD_CHILD_TIMER(_runtime_profile, "WaitForData", timer_name); + metrics[i] = ADD_CHILD_TIMER(_runtime_profile, fmt::format("WaitForData{}", i), timer_name); } RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone( state, vsort_exec_exprs)); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 12c6c38e4bc..c41268f8eac 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -54,16 +54,8 @@ struct ExchangeDataDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ExchangeDataDependency); ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* sender_queue) - : Dependency(id, "DataDependency"), _sender_queue(sender_queue), _always_done(false) {} + : Dependency(id, "DataDependency"), _always_done(false) {} void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { - if (config::enable_fuzzy_mode && _sender_queue->should_wait() && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _sender_queue->should_wait() ? this : nullptr; - } void set_always_done() { _always_done = true; @@ -74,17 +66,14 @@ public: _ready_for_read = true; } - void set_ready_for_read() override { - if (_always_done || !_ready_for_read) { + void block_reading() override { + if (_always_done) { return; } _ready_for_read = false; - // ScannerContext is set done outside this function now and only stop watcher here. - _read_dependency_watcher.start(); } private: - vectorized::VDataStreamRecvr::SenderQueue* _sender_queue; std::atomic<bool> _always_done; }; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 8ba6e2fba3b..10056a30e72 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -68,7 +68,6 @@ public: Status process_build_block(RuntimeState* state, vectorized::Block& block, uint8_t offset); void init_short_circuit_for_probe(); - HashJoinBuildSinkOperatorX* join_build() { return (HashJoinBuildSinkOperatorX*)_parent; } bool build_unique() const; std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index c9e0a38ec4c..ce00746380d 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -211,7 +211,6 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, SourceState& source_state) const { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); auto& _repeat_id_idx = local_state._repeat_id_idx; auto& _child_block = *local_state._child_block; auto& _child_eos = local_state._child_eos; diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index f6c52a0be8d..18d373b77df 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -70,6 +70,7 @@ private: std::unique_ptr<vectorized::Block> _intermediate_block {}; vectorized::VExprContextSPtrs _expr_ctxs; }; + class RepeatOperatorX final : public StatefulOperatorX<RepeatLocalState> { public: using Base = StatefulOperatorX<RepeatLocalState>; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index c29e1b8af87..cc22c96debd 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -447,11 +447,15 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori local_state._child_source_state != SourceState::FINISHED) { return Status::OK(); } - RETURN_IF_ERROR( - push(state, local_state._child_block.get(), local_state._child_source_state)); + { + SCOPED_TIMER(local_state.profile()->total_time_counter()); + RETURN_IF_ERROR( + push(state, local_state._child_block.get(), local_state._child_source_state)); + } } if (!need_more_input_data(state)) { + SCOPED_TIMER(local_state.profile()->total_time_counter()); SourceState new_state = SourceState::DEPEND_ON_SOURCE; RETURN_IF_ERROR(pull(state, block, new_state)); if (new_state == SourceState::FINISHED) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org