This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d809bee46e3 [refactor](profilev2) add BlocksProduced RowsProduced counter #27291 d809bee46e3 is described below commit d809bee46e318c9cab7becdc0426e0cdce3e511d Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Tue Nov 21 12:01:11 2023 +0800 [refactor](profilev2) add BlocksProduced RowsProduced counter #27291 --- be/src/pipeline/exec/exchange_sink_operator.cpp | 2 ++ be/src/pipeline/exec/exchange_sink_operator.h | 2 ++ be/src/pipeline/exec/result_sink_operator.cpp | 5 ++++- be/src/pipeline/exec/result_sink_operator.h | 4 ++++ 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 10cc44b80ba..42ab71a4547 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _split_block_distribute_by_channel_timer = ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); + _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter, @@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SourceState source_state) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); + COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); bool all_receiver_eof = true; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 1c7b6cbdaad..bc9c26e36ea 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -146,6 +146,7 @@ public: RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } + RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; } RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; } RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; } RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; } @@ -192,6 +193,7 @@ private: RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr; RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; + RuntimeProfile::Counter* _rows_sent_counter = nullptr; // Throughput per total time spent in sender RuntimeProfile::Counter* _overall_throughput = nullptr; // Used to counter send bytes under local data exchange diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 321642bdba0..e9a51d56ad1 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) state->execution_timeout())); _result_sink_dependency = ResultSinkDependency::create_shared(_parent->operator_id(), _parent->node_id()); - + _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); + _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); ((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency); return Status::OK(); } @@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); + COUNTER_UPDATE(local_state.blocks_sent_counter(), 1); if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 9bda54e79b6..93ce397b840 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -62,6 +62,8 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; WriteDependency* dependency() override { return _result_sink_dependency.get(); } + RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } + RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; } private: friend class ResultSinkOperatorX; @@ -71,6 +73,8 @@ private: std::shared_ptr<BufferControlBlock> _sender; std::shared_ptr<ResultWriter> _writer; std::shared_ptr<ResultSinkDependency> _result_sink_dependency; + RuntimeProfile::Counter* _blocks_sent_counter = nullptr; + RuntimeProfile::Counter* _rows_sent_counter = nullptr; }; class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org