This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 21319baac7491c3e99ba94be2570be09e936bce2
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sat Mar 9 14:49:54 2024 +0800

    [pipelineX](partition sort) Add some nessacery metrics (#32020)
    
    Add some necessary metrics
---
 be/src/pipeline/exec/partition_sort_sink_operator.cpp   | 16 +++++++++-------
 be/src/pipeline/exec/partition_sort_sink_operator.h     | 17 ++++++++---------
 be/src/pipeline/exec/partition_sort_source_operator.cpp |  8 ++++++++
 3 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 2481302fd40..571f1ed1399 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -45,6 +45,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
     _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
     _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
+    _passthrough_rows_counter = ADD_COUNTER(_profile, 
"PassThroughRowsCounter", TUnit::UNIT);
     _partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
             &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, 
p._nulls_first,
             p._child_x->row_desc(), state, _profile, p._has_global_limit, 
p._partition_inner_limit,
@@ -60,7 +61,11 @@ 
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
           _pool(pool),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
           _limit(tnode.limit),
-          _topn_phase(tnode.partition_sort_node.ptopn_phase) {}
+          
_partition_exprs_num(tnode.partition_sort_node.partition_exprs.size()),
+          _topn_phase(tnode.partition_sort_node.ptopn_phase),
+          _has_global_limit(tnode.partition_sort_node.has_global_limit),
+          _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
+          
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
 
 Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -75,12 +80,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
     if (tnode.partition_sort_node.__isset.partition_exprs) {
         RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
                 tnode.partition_sort_node.partition_exprs, 
_partition_expr_ctxs));
-        _partition_exprs_num = _partition_expr_ctxs.size();
     }
 
-    _has_global_limit = tnode.partition_sort_node.has_global_limit;
-    _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
-    _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
     return Status::OK();
 }
 
@@ -101,15 +102,14 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     auto& local_state = get_local_state(state);
     auto current_rows = input_block->rows();
     SCOPED_TIMER(local_state.exec_time_counter());
-    COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
     if (current_rows > 0) {
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
         local_state.child_input_rows = local_state.child_input_rows + 
current_rows;
         if (UNLIKELY(_partition_exprs_num == 0)) {
             if (UNLIKELY(local_state._value_places.empty())) {
                 local_state._value_places.push_back(_pool->add(new 
vectorized::PartitionBlocks(
                         local_state._partition_sort_info, 
local_state._value_places.empty())));
             }
-            //no partition key
             local_state._value_places[0]->append_whole_block(input_block, 
_child_x->row_desc());
         } else {
             //just simply use partition num to check
@@ -118,6 +118,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state._num_partition > 
config::partition_topn_partition_threshold &&
                 local_state.child_input_rows < 10000 * 
local_state._num_partition) {
                 {
+                    COUNTER_UPDATE(local_state._passthrough_rows_counter,
+                                   (int64_t)input_block->rows());
                     std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
                     
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
                     // buffer have data, source could read this.
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index b39001d4723..8602b096f51 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -78,8 +78,8 @@ private:
     RuntimeProfile::Counter* _build_timer = nullptr;
     RuntimeProfile::Counter* _emplace_key_timer = nullptr;
     RuntimeProfile::Counter* _selector_block_timer = nullptr;
-
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+    RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
     void _init_hash_method();
 };
 
@@ -108,19 +108,18 @@ private:
     friend class PartitionSortSinkLocalState;
     ObjectPool* _pool = nullptr;
     const RowDescriptor _row_descriptor;
-    int64_t _limit = -1;
-    int _partition_exprs_num = 0;
-    vectorized::VExprContextSPtrs _partition_expr_ctxs;
-
-    TPartTopNPhase::type _topn_phase;
+    const int64_t _limit = -1;
+    const int _partition_exprs_num = 0;
+    const TPartTopNPhase::type _topn_phase;
+    const bool _has_global_limit = false;
+    const TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
+    const int64_t _partition_inner_limit = 0;
 
+    vectorized::VExprContextSPtrs _partition_expr_ctxs;
     // Expressions and parameters used for build _sort_description
     vectorized::VSortExecExprs _vsort_exec_exprs;
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
-    TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
-    bool _has_global_limit = false;
-    int64_t _partition_inner_limit = 0;
 
     Status _split_block_by_partition(vectorized::Block* input_block,
                                      PartitionSortSinkLocalState& local_state, 
bool eos);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 166cd84fc4e..7fd03a11f7a 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -60,6 +60,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                     local_state._dependency->block();
                 }
             }
+            if (!output_block->empty()) {
+                COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
+                COUNTER_UPDATE(local_state.rows_returned_counter(), 
output_block->rows());
+            }
             return Status::OK();
         }
     }
@@ -78,6 +82,10 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
         *eos = local_state._shared_state->blocks_buffer.empty() &&
                local_state._sort_idx >= 
local_state._shared_state->partition_sorts.size();
     }
+    if (!output_block->empty()) {
+        COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
+        COUNTER_UPDATE(local_state.rows_returned_counter(), 
output_block->rows());
+    }
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to