This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new acf07cab6fe [refactor](minor) Init counter in prepare phase (#39287) 
(#39385)
acf07cab6fe is described below

commit acf07cab6fe8f175952958e46f7e9872d8a03d06
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 15 13:36:12 2024 +0800

    [refactor](minor) Init counter in prepare phase (#39287) (#39385)
    
    pick #39287
---
 be/src/pipeline/exec/exchange_sink_operator.cpp                | 9 +++------
 be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp | 2 ++
 be/src/vec/common/sort/sorter.cpp                              | 2 ++
 3 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e457f090af5..e4150b4f7ac 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -154,7 +154,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     // Make sure brpc stub is ready before execution.
     for (int i = 0; i < channels.size(); ++i) {
         RETURN_IF_ERROR(channels[i]->init_stub(state));
+        _wait_channel_timer.push_back(_profile->add_nonzero_counter(
+                fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit 
::TIME_NS, timer_name, 1));
     }
+    _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, 
"WaitForBroadcastBuffer", timer_name);
     return Status::OK();
 }
 
@@ -201,9 +204,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         for (int i = 0; i < config::num_broadcast_buffer; ++i) {
             
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
         }
-
-        _wait_broadcast_buffer_timer =
-                ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", 
timer_name);
     } else if (local_size > 0) {
         size_t dep_id = 0;
         for (auto* channel : channels) {
@@ -211,9 +211,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 if (auto dep = channel->get_local_channel_dependency()) {
                     _local_channels_dependency.push_back(dep);
                     DCHECK(_local_channels_dependency[dep_id] != nullptr);
-                    
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
-                            fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), TUnit ::TIME_NS,
-                            timer_name, 1));
                     dep_id++;
                 } else {
                     LOG(WARNING) << "local recvr is null: query id = "
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 043a28a5d9b..83de348dbdb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -32,6 +32,7 @@ 
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase
             std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
                                          parent->get_name() + 
"_SPILL_DEPENDENCY", true);
 }
+
 Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
                                           doris::pipeline::LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -64,6 +65,7 @@ Status PartitionedAggSinkLocalState::open(RuntimeState* 
state) {
     SCOPED_TIMER(Base::_open_timer);
     return Base::open(state);
 }
+
 Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_close_timer);
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index cfbd3cb41c8..eca7e15626b 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -66,6 +66,7 @@ void MergeSorterState::reset() {
     unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
     in_mem_sorted_bocks_size_ = 0;
 }
+
 Status MergeSorterState::add_sorted_block(Block& block) {
     auto rows = block.rows();
     if (0 == rows) {
@@ -279,6 +280,7 @@ Status FullSorter::_do_sort() {
     }
     return Status::OK();
 }
+
 size_t FullSorter::data_size() const {
     return _state->data_size();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to