This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 21319baac7491c3e99ba94be2570be09e936bce2 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Mar 9 14:49:54 2024 +0800 [pipelineX](partition sort) Add some nessacery metrics (#32020) Add some necessary metrics --- be/src/pipeline/exec/partition_sort_sink_operator.cpp | 16 +++++++++------- be/src/pipeline/exec/partition_sort_sink_operator.h | 17 ++++++++--------- be/src/pipeline/exec/partition_sort_source_operator.cpp | 8 ++++++++ 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 2481302fd40..571f1ed1399 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -45,6 +45,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _build_timer = ADD_TIMER(_profile, "HashTableBuildTime"); _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime"); _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime"); + _passthrough_rows_counter = ADD_COUNTER(_profile, "PassThroughRowsCounter", TUnit::UNIT); _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>( &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, @@ -60,7 +61,11 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope _pool(pool), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _limit(tnode.limit), - _topn_phase(tnode.partition_sort_node.ptopn_phase) {} + _partition_exprs_num(tnode.partition_sort_node.partition_exprs.size()), + _topn_phase(tnode.partition_sort_node.ptopn_phase), + _has_global_limit(tnode.partition_sort_node.has_global_limit), + _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm), + _partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {} Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); @@ -75,12 +80,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st if (tnode.partition_sort_node.__isset.partition_exprs) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( tnode.partition_sort_node.partition_exprs, _partition_expr_ctxs)); - _partition_exprs_num = _partition_expr_ctxs.size(); } - _has_global_limit = tnode.partition_sort_node.has_global_limit; - _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm; - _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit; return Status::OK(); } @@ -101,15 +102,14 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* auto& local_state = get_local_state(state); auto current_rows = input_block->rows(); SCOPED_TIMER(local_state.exec_time_counter()); - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); if (current_rows > 0) { + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); local_state.child_input_rows = local_state.child_input_rows + current_rows; if (UNLIKELY(_partition_exprs_num == 0)) { if (UNLIKELY(local_state._value_places.empty())) { local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks( local_state._partition_sort_info, local_state._value_places.empty()))); } - //no partition key local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); } else { //just simply use partition num to check @@ -118,6 +118,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._num_partition > config::partition_topn_partition_threshold && local_state.child_input_rows < 10000 * local_state._num_partition) { { + COUNTER_UPDATE(local_state._passthrough_rows_counter, + (int64_t)input_block->rows()); std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex); local_state._shared_state->blocks_buffer.push(std::move(*input_block)); // buffer have data, source could read this. diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index b39001d4723..8602b096f51 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -78,8 +78,8 @@ private: RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _emplace_key_timer = nullptr; RuntimeProfile::Counter* _selector_block_timer = nullptr; - RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _passthrough_rows_counter = nullptr; void _init_hash_method(); }; @@ -108,19 +108,18 @@ private: friend class PartitionSortSinkLocalState; ObjectPool* _pool = nullptr; const RowDescriptor _row_descriptor; - int64_t _limit = -1; - int _partition_exprs_num = 0; - vectorized::VExprContextSPtrs _partition_expr_ctxs; - - TPartTopNPhase::type _topn_phase; + const int64_t _limit = -1; + const int _partition_exprs_num = 0; + const TPartTopNPhase::type _topn_phase; + const bool _has_global_limit = false; + const TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; + const int64_t _partition_inner_limit = 0; + vectorized::VExprContextSPtrs _partition_expr_ctxs; // Expressions and parameters used for build _sort_description vectorized::VSortExecExprs _vsort_exec_exprs; std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; - TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; - bool _has_global_limit = false; - int64_t _partition_inner_limit = 0; Status _split_block_by_partition(vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos); diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 166cd84fc4e..7fd03a11f7a 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -60,6 +60,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: local_state._dependency->block(); } } + if (!output_block->empty()) { + COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); + COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows()); + } return Status::OK(); } } @@ -78,6 +82,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: *eos = local_state._shared_state->blocks_buffer.empty() && local_state._sort_idx >= local_state._shared_state->partition_sorts.size(); } + if (!output_block->empty()) { + COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); + COUNTER_UPDATE(local_state.rows_returned_counter(), output_block->rows()); + } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org