This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d809bee46e3 [refactor](profilev2) add BlocksProduced RowsProduced 
counter #27291
d809bee46e3 is described below

commit d809bee46e318c9cab7becdc0426e0cdce3e511d
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Tue Nov 21 12:01:11 2023 +0800

    [refactor](profilev2) add BlocksProduced RowsProduced counter #27291
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 2 ++
 be/src/pipeline/exec/exchange_sink_operator.h   | 2 ++
 be/src/pipeline/exec/result_sink_operator.cpp   | 5 ++++-
 be/src/pipeline/exec/result_sink_operator.h     | 4 ++++
 4 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 10cc44b80ba..42ab71a4547 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _split_block_distribute_by_channel_timer =
             ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
+    _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     _overall_throughput = _profile->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
             std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_bytes_sent_counter,
@@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                    SourceState source_state) {
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
+    COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.exec_time_counter());
     
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     bool all_receiver_eof = true;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 1c7b6cbdaad..bc9c26e36ea 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -146,6 +146,7 @@ public:
 
     RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
     RuntimeProfile::Counter* blocks_sent_counter() { return 
_blocks_sent_counter; }
+    RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
     RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
     RuntimeProfile::Counter* local_bytes_send_counter() { return 
_local_bytes_send_counter; }
     RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; }
@@ -192,6 +193,7 @@ private:
     RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr;
     RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = 
nullptr;
     RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
+    RuntimeProfile::Counter* _rows_sent_counter = nullptr;
     // Throughput per total time spent in sender
     RuntimeProfile::Counter* _overall_throughput = nullptr;
     // Used to counter send bytes under local data exchange
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 321642bdba0..e9a51d56ad1 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
             state->execution_timeout()));
     _result_sink_dependency =
             ResultSinkDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-
+    _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
+    _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
     return Status::OK();
 }
@@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block,
                                  SourceState source_state) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
+    COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
+    COUNTER_UPDATE(local_state.blocks_sent_counter(), 1);
     if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
         RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
     }
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 9bda54e79b6..93ce397b840 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -62,6 +62,8 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     WriteDependency* dependency() override { return 
_result_sink_dependency.get(); }
+    RuntimeProfile::Counter* blocks_sent_counter() { return 
_blocks_sent_counter; }
+    RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
 
 private:
     friend class ResultSinkOperatorX;
@@ -71,6 +73,8 @@ private:
     std::shared_ptr<BufferControlBlock> _sender;
     std::shared_ptr<ResultWriter> _writer;
     std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
+    RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
+    RuntimeProfile::Counter* _rows_sent_counter = nullptr;
 };
 
 class ResultSinkOperatorX final : public 
DataSinkOperatorX<ResultSinkLocalState> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to