This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6828250207a [profile](refactor) Fix invalid shuffle profile (#26298) 6828250207a is described below commit 6828250207a37985437657c645d3690efd128e00 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Nov 2 17:07:05 2023 +0800 [profile](refactor) Fix invalid shuffle profile (#26298) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 8 +++++--- be/src/pipeline/exec/result_file_sink_operator.cpp | 5 +++++ be/src/pipeline/exec/result_file_sink_operator.h | 10 +++++++++ be/src/vec/sink/vdata_stream_sender.cpp | 24 ++++++++-------------- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e8d52b8eaac..9e05d682864 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -392,9 +392,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { auto rows = block->rows(); - SCOPED_TIMER(local_state._split_block_hash_compute_timer); - RETURN_IF_ERROR( - local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + { + SCOPED_TIMER(local_state._split_block_hash_compute_timer); + RETURN_IF_ERROR( + local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); + } if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows(state, local_state.channels, local_state._partition_count, diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index e572bfe93e5..217696c6939 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -106,6 +106,11 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i _sender_id = info.sender_id; _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait"); + _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); + _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime"); + _split_block_distribute_by_channel_timer = + ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); + _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime"); auto& p = _parent->cast<ResultFileSinkOperatorX>(); CHECK(p._file_opts.get() != nullptr); if (p._is_top_sink) { diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 3a98401de60..63217aad047 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -58,6 +58,12 @@ public: [[nodiscard]] int sender_id() const { return _sender_id; } RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } + RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; } + RuntimeProfile::Counter* brpc_send_timer() { return _brpc_send_timer; } + RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } + RuntimeProfile::Counter* split_block_distribute_by_channel_timer() { + return _split_block_distribute_by_channel_timer; + } private: friend class ResultFileSinkOperatorX; @@ -73,6 +79,10 @@ private: vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer; std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder; RuntimeProfile::Counter* _brpc_wait_timer; + RuntimeProfile::Counter* _local_send_timer; + RuntimeProfile::Counter* _brpc_send_timer; + RuntimeProfile::Counter* _merge_block_timer; + RuntimeProfile::Counter* _split_block_distribute_by_channel_timer; int _sender_id; }; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ca565f82d41..a7066f981ba 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -126,9 +126,7 @@ Status Channel<Parent>::send_current_block(bool eos, Status exec_status) { template <typename Parent> Status Channel<Parent>::send_local_block(Status exec_status, bool eos) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->local_send_timer()); - } + SCOPED_TIMER(_parent->local_send_timer()); Block block = _serializer.get_block()->to_block(); _serializer.get_block()->set_muatable_columns(block.clone_empty_columns()); if (_recvr_is_valid()) { @@ -157,9 +155,7 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) { template <typename Parent> Status Channel<Parent>::send_local_block(Block* block) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->local_send_timer()); - } + SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); @@ -176,9 +172,9 @@ Status Channel<Parent>::send_local_block(Block* block) { template <typename Parent> Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) { if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->brpc_send_timer()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } + SCOPED_TIMER(_parent->brpc_send_timer()); if (_closure == nullptr) { _closure = new RefCountClosure<PTransmitDataResult>(); @@ -631,8 +627,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { auto rows = block->rows(); - SCOPED_TIMER(_split_block_hash_compute_timer); - RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get())); + { + SCOPED_TIMER(_split_block_hash_compute_timer); + RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get())); + } if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count, (uint64_t*)_partitioner->get_channel_ids(), rows, @@ -729,16 +727,12 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (rows) { if (rows->size() > 0) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); - } + SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); const int* begin = &(*rows)[0]; _mutable_block->add_rows(block, begin, begin + rows->size()); } } else if (!block->empty()) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->merge_block_timer()); - } + SCOPED_TIMER(_parent->merge_block_timer()); RETURN_IF_ERROR(_mutable_block->merge(*block)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org