This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new acf07cab6fe [refactor](minor) Init counter in prepare phase (#39287) (#39385) acf07cab6fe is described below commit acf07cab6fe8f175952958e46f7e9872d8a03d06 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 15 13:36:12 2024 +0800 [refactor](minor) Init counter in prepare phase (#39287) (#39385) pick #39287 --- be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +++------ be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp | 2 ++ be/src/vec/common/sort/sorter.cpp | 2 ++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e457f090af5..e4150b4f7ac 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -154,7 +154,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf // Make sure brpc stub is ready before execution. for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->init_stub(state)); + _wait_channel_timer.push_back(_profile->add_nonzero_counter( + fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1)); } + _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); return Status::OK(); } @@ -201,9 +204,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { for (int i = 0; i < config::num_broadcast_buffer; ++i) { _broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared()); } - - _wait_broadcast_buffer_timer = - ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); } else if (local_size > 0) { size_t dep_id = 0; for (auto* channel : channels) { @@ -211,9 +211,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { if (auto dep = channel->get_local_channel_dependency()) { _local_channels_dependency.push_back(dep); DCHECK(_local_channels_dependency[dep_id] != nullptr); - _wait_channel_timer.push_back(_profile->add_nonzero_counter( - fmt::format("WaitForLocalExchangeBuffer{}", dep_id), TUnit ::TIME_NS, - timer_name, 1)); dep_id++; } else { LOG(WARNING) << "local recvr is null: query id = " diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 043a28a5d9b..83de348dbdb 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -32,6 +32,7 @@ PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), parent->get_name() + "_SPILL_DEPENDENCY", true); } + Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -64,6 +65,7 @@ Status PartitionedAggSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(Base::_open_timer); return Base::open(state); } + Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_close_timer); diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index cfbd3cb41c8..eca7e15626b 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -66,6 +66,7 @@ void MergeSorterState::reset() { unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } + Status MergeSorterState::add_sorted_block(Block& block) { auto rows = block.rows(); if (0 == rows) { @@ -279,6 +280,7 @@ Status FullSorter::_do_sort() { } return Status::OK(); } + size_t FullSorter::data_size() const { return _state->data_size(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org