This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7c2fc928c [refactor](profile) Refactor of RuntimeFilter profile 
(#49777)
0a7c2fc928c is described below

commit 0a7c2fc928ce41dc84e25c1af3e337fcfaf57327
Author: zhiqiang <hezhiqi...@selectdb.com>
AuthorDate: Tue Apr 8 18:24:24 2025 +0800

    [refactor](profile) Refactor of RuntimeFilter profile (#49777)
    
    ### What problem does this PR solve?
    
    Refactor of runtime filter profile.
    
    1. Counter is shared by related objects.
    2. Update RuntimeProfile only when pipeline task is closed.
    3. A new Counter type, which is similar to info_string but could be
    added as a Counter.
    
    ```
    use ssb;
    SELECT  c_city,  s_city,  d_year,  SUM(lo_revenue)  AS  REVENUE  FROM  
customer,  lineorder,  supplier,  dates  WHERE  lo_custkey  =  c_custkey  AND  
lo_suppkey  =  s_suppkey  AND  lo_orderdate  =  d_datekey  AND  (  c_city  =  
'UNITED  KI1'  OR  c_city  =  'UNITED  KI5'  )  AND  (  s_city  =  'UNITED  
KI1'  OR  s_city  =  'UNITED  KI5'  )  AND  d_year  >=  1992  AND  d_year  <=  
1997  GROUP  BY  c_city,  s_city,  d_year  ORDER  BY  d_year  ASC,  REVENUE  
DESC
    ```
    We will have a structured counter in executon profile like
    ```text
    -  RuntimeFilterInfo:
          -  AcquireRuntimeFilter:  6.972ms
          -  RF0  AlwaysTrueFilterRows:  0
          -  RF0  FilterRows:  41
          -  RF0  Info:  Consumer:  ([id:  0,  state:  [READY],  type:  
MINMAX_FILTER,  column_type:  INT],  mode:  LOCAL,  state:  APPLIED)
          -  RF0  InputRows:  48.757K  (48757)
          -  RF0  WaitTime:  161.0ms
          -  RF1  AlwaysTrueFilterRows:  0
          -  RF1  FilterRows:  11.288919M  (11288919)
          -  RF1  Info:  Consumer:  ([id:  1,  state:  [READY],  type:  
IN_OR_BLOOM_FILTER(BLOOM_FILTER),  column_type:  INT,  bf_size:  1048576,  
build_bf_by_runtime_size:  true],  mode:  LOCAL,  state:  APPLIED)
          -  RF1  InputRows:  11.381544M  (11381544)
          -  RF1  WaitTime:  160.0ms
          -  RF2  AlwaysTrueFilterRows:  0
          -  RF2  FilterRows:  0
          -  RF2  Info:  Consumer:  ([id:  2,  state:  [READY],  type:  
MINMAX_FILTER,  column_type:  INT],  mode:  LOCAL,  state:  APPLIED)
          -  RF2  InputRows:  48.686K  (48686)
          -  RF2  WaitTime:  224.0ms
          -  RF3  AlwaysTrueFilterRows:  0
          -  RF3  FilterRows:  91.841K  (91841)
          -  RF3  Info:  Consumer:  ([id:  3,  state:  [READY],  type:  
IN_OR_BLOOM_FILTER(BLOOM_FILTER),  column_type:  INT,  bf_size:  1048576,  
build_bf_by_runtime_size:  true],  mode:  LOCAL,  state:  APPLIED)
          -  RF3  InputRows:  92.625K  (92625)
          -  RF3  WaitTime:  223.0ms
          -  RF4  AlwaysTrueFilterRows:  0
          -  RF4  FilterRows:  0
          -  RF4  Info:  Consumer:  ([id:  4,  state:  [READY],  type:  
MINMAX_FILTER,  column_type:  INT],  mode:  LOCAL,  state:  APPLIED)
          -  RF4  InputRows:  0
          -  RF4  WaitTime:  149.0ms
          -  RF5  AlwaysTrueFilterRows:  0
          -  RF5  FilterRows:  0
          -  RF5  Info:  Consumer:  ([id:  5,  state:  [READY],  type:  
IN_OR_BLOOM_FILTER(BLOOM_FILTER),  column_type:  INT,  bf_size:  1048576,  
build_bf_by_runtime_size:  true],  mode:  LOCAL,  state:  APPLIED)
          -  RF5  InputRows:  7
          -  RF5  WaitTime:  149.0ms
    ```
    Merged profile will be like
    ```
    -  RuntimeFilterInfo:  sum  ,  avg  ,  max  ,  min
        -  RF0  FilterRows:  sum  2.139K  (2139),  avg  44,  max  59,  min  30
        -  RF0  InputRows:  sum  2.340004M  (2340004),  avg  48.75K  (48750),  
max  48.76K  (48760),  min  48.741K  (48741)
        -  RF1  FilterRows:  sum  542.210415M  (542210415),  avg  11.29605M  
(11296050),  max  11.307756M  (11307756),  min  11.281476M  (11281476)
        -  RF1  InputRows:  sum  546.667366M  (546667366),  avg  11.388903M  
(11388903),  max  11.400674M  (11400674),  min  11.374343M  (11374343)
        -  RF2  FilterRows:  sum  109,  avg  2,  max  12,  min  0
        -  RF2  InputRows:  sum  2.336525M  (2336525),  avg  48.677K  (48677),  
max  48.708K  (48708),  min  48.645K  (48645)
        -  RF3  FilterRows:  sum  4.421298M  (4421298),  avg  92.11K  (92110),  
max  92.878K  (92878),  min  91.634K  (91634)
        -  RF3  InputRows:  sum  4.456951M  (4456951),  avg  92.853K  (92853),  
max  93.618K  (93618),  min  92.338K  (92338)
        -  RF4  FilterRows:  sum  0,  avg  0,  max  0,  min  0
        -  RF4  InputRows:  sum  0,  avg  0,  max  0,  min  0
        -  RF5  FilterRows:  sum  0,  avg  0,  max  0,  min  0
        -  RF5  InputRows:  sum  340,  avg  7,  max  9,  min  6
    ```
---
 be/src/exec/olap_common.h                          | 28 +++++--
 be/src/olap/bitmap_filter_predicate.h              |  2 -
 be/src/olap/column_predicate.h                     | 45 +++++------
 be/src/olap/comparison_predicate.h                 |  6 +-
 be/src/olap/filter_olap_param.h                    | 24 ++++--
 be/src/olap/olap_common.h                          |  9 +--
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 12 ---
 be/src/olap/tablet_reader.cpp                      | 15 ++--
 be/src/pipeline/exec/datagen_operator.cpp          |  4 +-
 .../exec/multi_cast_data_stream_source.cpp         |  6 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        | 32 --------
 be/src/pipeline/exec/olap_scan_operator.h          |  6 --
 be/src/pipeline/exec/scan_operator.cpp             | 13 +--
 be/src/runtime/runtime_state.cpp                   |  4 +-
 be/src/runtime/runtime_state.h                     |  7 +-
 be/src/runtime_filter/runtime_filter_consumer.cpp  | 49 ++++++++----
 be/src/runtime_filter/runtime_filter_consumer.h    | 52 ++++++------
 .../runtime_filter_consumer_helper.cpp             | 25 ++++--
 .../runtime_filter_consumer_helper.h               | 13 ++-
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  9 +--
 be/src/runtime_filter/runtime_filter_mgr.h         |  3 +-
 be/src/util/runtime_profile.cpp                    | 38 ++++-----
 be/src/util/runtime_profile.h                      | 51 ++++++------
 be/src/vec/exec/scan/olap_scanner.cpp              |  3 -
 be/src/vec/exprs/vruntimefilter_wrapper.cpp        |  6 +-
 be/src/vec/exprs/vruntimefilter_wrapper.h          | 56 +++++++------
 .../runtime_filter_consumer_helper_test.cpp        |  4 +-
 .../runtime_filter_consumer_test.cpp               | 41 +++++-----
 be/test/runtime_filter/runtime_filter_mgr_test.cpp |  3 +-
 .../runtime_filter_producer_test.cpp               | 12 +--
 .../runtime_profile_counter_tree_node_test.cpp     | 92 +++++++++++++++-------
 .../org/apache/doris/common/profile/Counter.java   | 16 +++-
 .../doris/common/profile/RuntimeProfile.java       |  8 +-
 gensrc/thrift/RuntimeProfile.thrift                |  1 +
 34 files changed, 368 insertions(+), 327 deletions(-)

diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 65469b6c968..62604bd1bc9 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <glog/logging.h>
 #include <stddef.h>
@@ -41,6 +42,7 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
 #include "runtime/type_limit.h"
+#include "util/runtime_profile.h"
 #include "vec/core/types.h"
 #include "vec/io/io_helper.h"
 #include "vec/runtime/ipv4_value.h"
@@ -301,12 +303,21 @@ public:
         _contain_null = _is_nullable_col && contain_null;
     }
 
-    void set_runtime_filter_info(int runtime_filter_id,
-                                 RuntimeProfile::Counter* 
predicate_filtered_rows_counter,
-                                 RuntimeProfile::Counter* 
predicate_input_rows_counter) {
+    void attach_profile_counter(
+            int runtime_filter_id,
+            std::shared_ptr<RuntimeProfile::Counter> 
predicate_filtered_rows_counter,
+            std::shared_ptr<RuntimeProfile::Counter> 
predicate_input_rows_counter) {
+        DCHECK(predicate_filtered_rows_counter != nullptr);
+        DCHECK(predicate_input_rows_counter != nullptr);
+
         _runtime_filter_id = runtime_filter_id;
-        _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
-        _predicate_input_rows_counter = predicate_input_rows_counter;
+
+        if (predicate_filtered_rows_counter != nullptr) {
+            _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
+        }
+        if (predicate_input_rows_counter != nullptr) {
+            _predicate_input_rows_counter = predicate_input_rows_counter;
+        }
     }
 
     int precision() const { return _precision; }
@@ -370,8 +381,11 @@ private:
                                                   primitive_type == 
PrimitiveType::TYPE_DATETIMEV2;
 
     int _runtime_filter_id = -1;
-    RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+
+    std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+    std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
 };
 
 class OlapScanKeys {
diff --git a/be/src/olap/bitmap_filter_predicate.h 
b/be/src/olap/bitmap_filter_predicate.h
index 12cbd94ec8d..c1488869a04 100644
--- a/be/src/olap/bitmap_filter_predicate.h
+++ b/be/src/olap/bitmap_filter_predicate.h
@@ -113,8 +113,6 @@ uint16_t 
BitmapFilterColumnPredicate<T>::_evaluate_inner(const vectorized::IColu
     } else {
         new_size = evaluate<false>(column, nullptr, sel, size);
     }
-    _evaluated_rows += size;
-    _passed_rows += new_size;
     update_filter_info(size - new_size, size);
     return new_size;
 }
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 628b49c6213..92e5dea4537 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <memory>
 #include <roaring/roaring.hh>
 
 #include "common/exception.h"
@@ -24,6 +25,7 @@
 #include "olap/rowset/segment_v2/bloom_filter.h"
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
 #include "runtime/define_primitive_type.h"
+#include "util/runtime_profile.h"
 #include "vec/columns/column.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 
@@ -184,8 +186,6 @@ public:
         }
 
         uint16_t new_size = _evaluate_inner(column, sel, size);
-        _evaluated_rows += size;
-        _passed_rows += new_size;
         if (_can_ignore()) {
             do_judge_selectivity(size - new_size, size);
         }
@@ -255,31 +255,25 @@ public:
 
     int get_runtime_filter_id() const { return _runtime_filter_id; }
 
-    void set_runtime_filter_info(int filter_id,
-                                 RuntimeProfile::Counter* 
predicate_filtered_rows_counter,
-                                 RuntimeProfile::Counter* 
predicate_input_rows_counter) {
-        if (filter_id >= 0) {
-            DCHECK(predicate_filtered_rows_counter != nullptr);
-            DCHECK(predicate_input_rows_counter != nullptr);
-        }
+    void attach_profile_counter(
+            int filter_id, std::shared_ptr<RuntimeProfile::Counter> 
predicate_filtered_rows_counter,
+            std::shared_ptr<RuntimeProfile::Counter> 
predicate_input_rows_counter) {
         _runtime_filter_id = filter_id;
-        _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
-        _predicate_input_rows_counter = predicate_input_rows_counter;
-    }
+        DCHECK(predicate_filtered_rows_counter != nullptr);
+        DCHECK(predicate_input_rows_counter != nullptr);
 
-    /// TODO: Currently we only record statistics for runtime filters, in the 
future we should record for all predicates
-    void update_filter_info(int64_t filter_rows, int64_t input_rows) const {
-        if (_predicate_input_rows_counter) {
-            COUNTER_UPDATE(_predicate_input_rows_counter, input_rows);
+        if (predicate_filtered_rows_counter != nullptr) {
+            _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
         }
-        if (_predicate_filtered_rows_counter) {
-            COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows);
+        if (predicate_input_rows_counter != nullptr) {
+            _predicate_input_rows_counter = predicate_input_rows_counter;
         }
     }
 
-    PredicateFilterInfo get_filtered_info() const {
-        return PredicateFilterInfo {static_cast<int>(type()), _evaluated_rows 
- 1,
-                                    _evaluated_rows - 1 - _passed_rows};
+    /// TODO: Currently we only record statistics for runtime filters, in the 
future we should record for all predicates
+    void update_filter_info(int64_t filter_rows, int64_t input_rows) const {
+        COUNTER_UPDATE(_predicate_input_rows_counter, input_rows);
+        COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows);
     }
 
     static std::string pred_type_string(PredicateType type) {
@@ -353,8 +347,6 @@ protected:
     // TODO: the value is only in delete condition, better be template value
     bool _opposite;
     int _runtime_filter_id = -1;
-    mutable uint64_t _evaluated_rows = 1;
-    mutable uint64_t _passed_rows = 0;
     // VRuntimeFilterWrapper and ColumnPredicate share the same logic,
     // but it's challenging to unify them, so the code is duplicated.
     // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
@@ -368,8 +360,11 @@ protected:
     mutable uint64_t _judge_filter_rows = 0;
     mutable bool _always_true = false;
 
-    RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+    std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+
+    std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
 };
 
 } //namespace doris
diff --git a/be/src/olap/comparison_predicate.h 
b/be/src/olap/comparison_predicate.h
index d7bf38a6c6a..1fddd4b1046 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -267,9 +267,12 @@ public:
             } else {
                 current_evaluated_rows += size;
             }
-            _evaluated_rows += current_evaluated_rows;
         }
 
+        // defer is created after its reference args are created.
+        // so defer will be destroyed BEFORE the reference args.
+        // so reference here is safe.
+        // 
https://stackoverflow.com/questions/14688285/c-local-variable-destruction-order
         Defer defer([&]() {
             update_filter_info(current_evaluated_rows - current_passed_rows,
                                current_evaluated_rows);
@@ -359,7 +362,6 @@ public:
             for (uint16_t i = 0; i < size; i++) {
                 current_passed_rows += flags[i];
             }
-            _passed_rows += current_passed_rows;
             do_judge_selectivity(current_evaluated_rows - current_passed_rows,
                                  current_evaluated_rows);
         }
diff --git a/be/src/olap/filter_olap_param.h b/be/src/olap/filter_olap_param.h
index d9aa6386ec6..272fee63fb5 100644
--- a/be/src/olap/filter_olap_param.h
+++ b/be/src/olap/filter_olap_param.h
@@ -24,18 +24,28 @@ namespace doris {
 template <typename T>
 struct FilterOlapParam {
     FilterOlapParam(std::string column_name, T filter, int runtime_filter_id,
-                    RuntimeProfile::Counter* filtered_counter,
-                    RuntimeProfile::Counter* input_counter)
+                    std::shared_ptr<RuntimeProfile::Counter> filtered_counter,
+                    std::shared_ptr<RuntimeProfile::Counter> input_counter)
             : column_name(std::move(column_name)),
               filter(std::move(filter)),
-              runtime_filter_id(runtime_filter_id),
-              filtered_rows_counter(filtered_counter),
-              input_rows_counter(input_counter) {}
+              runtime_filter_id(runtime_filter_id) {
+        DCHECK(filtered_rows_counter != nullptr);
+        DCHECK(input_rows_counter != nullptr);
+        if (filtered_counter != nullptr) {
+            filtered_rows_counter = filtered_counter;
+        }
+        if (input_counter != nullptr) {
+            input_rows_counter = input_counter;
+        }
+    }
+
     std::string column_name;
     T filter;
     int runtime_filter_id;
-    RuntimeProfile::Counter* filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* input_rows_counter = nullptr;
+    std::shared_ptr<RuntimeProfile::Counter> filtered_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+    std::shared_ptr<RuntimeProfile::Counter> input_rows_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
 };
 
 } // namespace doris
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 616bcf980de..fe92bea5017 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -77,11 +77,7 @@ struct DataDirInfo {
     DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR;
     std::string metric_name;
 };
-struct PredicateFilterInfo {
-    int type = 0;
-    uint64_t input_row = 0;
-    uint64_t filtered_row = 0;
-};
+
 // Sort DataDirInfo by available space.
 struct DataDirInfoLessAvailability {
     bool operator()(const DataDirInfo& left, const DataDirInfo& right) const {
@@ -337,9 +333,6 @@ struct OlapReaderStatistics {
     int64_t short_cond_ns = 0;
     int64_t expr_filter_ns = 0;
     int64_t output_col_ns = 0;
-
-    std::map<int, PredicateFilterInfo> filter_info;
-
     int64_t rows_key_range_filtered = 0;
     int64_t rows_stats_filtered = 0;
     int64_t rows_stats_rp_filtered = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 00306091876..ce4ec01c8ba 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1877,17 +1877,6 @@ uint16_t 
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
     return selected_size;
 }
 
-void SegmentIterator::_collect_runtime_filter_predicate() {
-    // collect profile
-    for (auto* p : _filter_info_id) {
-        // There is a situation, such as with in or minmax filters,
-        // where intermediate conversion to a key range or other types
-        // prevents obtaining the filter id.
-        if (p->is_runtime_filter()) {
-            _opts.stats->filter_info[p->get_runtime_filter_id()] = 
p->get_filtered_info();
-        }
-    }
-}
 Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids,
                                                 std::vector<rowid_t>& 
rowid_vector,
                                                 uint16_t* sel_rowid_idx, 
size_t select_size,
@@ -2147,7 +2136,6 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
             //          In SSB test, it make no difference; So need more 
scenarios to test
             selected_size = 
_evaluate_short_circuit_predicate(_sel_rowid_idx.data(), selected_size);
 
-            _collect_runtime_filter_predicate();
             if (selected_size > 0) {
                 // step 3.1: output short circuit and predicate column
                 // when lazy materialization enables, _predicate_column_ids = 
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 4503749e1fe..be2be8626b8 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -520,16 +520,13 @@ Status TabletReader::_init_orderby_keys_param(const 
ReaderParams& read_params) {
 Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
     SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns);
     std::vector<ColumnPredicate*> predicates;
-    auto emplace_predicate = [&predicates](auto& param, ColumnPredicate* 
predicate) {
-        predicate->set_runtime_filter_info(param.runtime_filter_id, 
param.filtered_rows_counter,
-                                           param.input_rows_counter);
-        predicates.emplace_back(predicate);
-    };
 
-    auto parse_and_emplace_predicates = [this, &emplace_predicate](auto& 
params) {
+    auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
         for (const auto& param : params) {
             ColumnPredicate* predicate = 
_parse_to_predicate({param.column_name, param.filter});
-            emplace_predicate(param, predicate);
+            predicate->attach_profile_counter(param.runtime_filter_id, 
param.filtered_rows_counter,
+                                              param.input_rows_counter);
+            predicates.emplace_back(predicate);
         }
     };
 
@@ -545,7 +542,9 @@ Status TabletReader::_init_conditions_param(const 
ReaderParams& read_params) {
                 parse_to_predicate(mcolumn, index, tmp_cond, 
_predicate_arena.get());
         // record condition value into predicate_params in order to pushdown 
segment_iterator,
         // _gen_predicate_result_sign will build predicate result unique sign 
with condition value
-        emplace_predicate(param, predicate);
+        predicate->attach_profile_counter(param.runtime_filter_id, 
param.filtered_rows_counter,
+                                          param.input_rows_counter);
+        predicates.emplace_back(predicate);
     }
     parse_and_emplace_predicates(read_params.bloom_filters);
     parse_and_emplace_predicates(read_params.bitmap_filters);
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 5d2c80874bd..c494edd397b 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -96,8 +96,8 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : p._runtime_filter_descs) {
         std::shared_ptr<RuntimeFilterConsumer> filter;
-        RETURN_IF_ERROR(state->register_consumer_runtime_filter(
-                filter_desc, p.is_serial_operator(), p.node_id(), &filter, 
_runtime_profile.get()));
+        RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, 
p.is_serial_operator(),
+                                                                p.node_id(), 
&filter));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index fd9788e326a..37aa8fa3b77 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -43,8 +43,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     _filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
     _get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime");
     _materialize_data_timer = ADD_TIMER(_runtime_profile, 
"MaterializeDataTime");
-    RETURN_IF_ERROR(_helper.init(state, profile(), false, 
_filter_dependencies, p.operator_id(),
-                                 p.node_id(), p.get_name() + 
"_FILTER_DEPENDENCY"));
+    RETURN_IF_ERROR(_helper.init(state, false, _filter_dependencies, 
p.operator_id(), p.node_id(),
+                                 p.get_name() + "_FILTER_DEPENDENCY"));
     return Status::OK();
 }
 
@@ -81,7 +81,7 @@ Status 
MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
         rf_time += dep->watcher_elapse_time();
     }
     COUNTER_SET(_wait_for_rf_timer, rf_time);
-
+    _helper.collect_realtime_profile(profile());
     return Base::close(state);
 }
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index b93c22274d9..97297dd4b6c 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -157,8 +157,6 @@ Status OlapScanLocalState::_init_profile() {
     _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", 
TUnit::UNIT);
     _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
     _key_range_counter = ADD_COUNTER(_runtime_profile, "KeyRangesNum", 
TUnit::UNIT);
-    _runtime_filter_info = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"RuntimeFilterInfo", 1);
-
     _tablet_reader_init_timer = ADD_TIMER(_scanner_profile, 
"TabletReaderInitTimer");
     _tablet_reader_capture_rs_readers_timer =
             ADD_TIMER(_scanner_profile, "TabletReaderCaptureRsReadersTimer");
@@ -643,36 +641,6 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() 
{
     return Status::OK();
 }
 
-void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo& 
update_info) {
-    std::unique_lock lock(_profile_mtx);
-    // update
-    _filter_info[id].filtered_row += update_info.filtered_row;
-    _filter_info[id].input_row += update_info.input_row;
-    _filter_info[id].type = update_info.type;
-    // to string
-    auto& info = _filter_info[id];
-    std::string filter_name = "RuntimeFilterInfo id ";
-    filter_name += std::to_string(id);
-    std::string info_str;
-    info_str += "type = " + 
type_to_string(static_cast<PredicateType>(info.type)) + ", ";
-    info_str += "input = " + std::to_string(info.input_row) + ", ";
-    info_str += "filtered = " + std::to_string(info.filtered_row);
-    info_str = "[" + info_str + "]";
-
-    // add info
-    _segment_profile->add_info_string(filter_name, info_str);
-
-    const std::string rf_name = "filter id = " + std::to_string(id) + " ";
-
-    // add counter
-    auto* input_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile, rf_name 
+ "input",
-                                                     TUnit::UNIT, 
"RuntimeFilterInfo", 1);
-    auto* filtered_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile, 
rf_name + "filtered",
-                                                        TUnit::UNIT, 
"RuntimeFilterInfo", 1);
-    COUNTER_SET(input_count, (int64_t)info.input_row);
-    COUNTER_SET(filtered_count, (int64_t)info.filtered_row);
-}
-
 OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
int operator_id,
                                      const DescriptorTbl& descs, int 
parallel_tasks,
                                      const TQueryCacheParam& param)
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index f6275df046e..ee3d995958a 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -87,8 +87,6 @@ private:
 
     Status _init_scanners(std::list<vectorized::ScannerSPtr>* scanners) 
override;
 
-    void add_filter_info(int id, const PredicateFilterInfo& info);
-
     Status _build_key_ranges_and_filters();
 
     std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
@@ -122,7 +120,6 @@ private:
     RuntimeProfile::Counter* _short_cond_timer = nullptr;
     RuntimeProfile::Counter* _expr_filter_timer = nullptr;
     RuntimeProfile::Counter* _output_col_timer = nullptr;
-    std::map<int, PredicateFilterInfo> _filter_info;
 
     RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
     RuntimeProfile::Counter* _stats_rp_filtered_counter = nullptr;
@@ -188,8 +185,6 @@ private:
     // total number of segment related to this scan node
     RuntimeProfile::Counter* _total_segment_counter = nullptr;
 
-    RuntimeProfile::Counter* _runtime_filter_info = nullptr;
-
     // timer about tablet reader
     RuntimeProfile::Counter* _tablet_reader_init_timer = nullptr;
     RuntimeProfile::Counter* _tablet_reader_capture_rs_readers_timer = nullptr;
@@ -217,7 +212,6 @@ private:
     RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr;
     RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
 
-    std::mutex _profile_mtx;
     std::vector<TabletWithVersion> _tablets;
     std::vector<TabletReader::ReadSource> _read_sources;
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index fdc5678862e..3db2d5ced00 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -18,6 +18,7 @@
 #include "scan_operator.h"
 
 #include <fmt/format.h>
+#include <gen_cpp/Metrics_types.h>
 
 #include <cstdint>
 #include <memory>
@@ -73,7 +74,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
-    RETURN_IF_ERROR(_helper.init(state, profile(), p.is_serial_operator(), 
_filter_dependencies,
+    RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(), 
_filter_dependencies,
                                  p.operator_id(), p.node_id(),
                                  p.get_name() + "_FILTER_DEPENDENCY"));
     RETURN_IF_ERROR(_init_profile());
@@ -287,7 +288,9 @@ Status ScanLocalState<Derived>::_normalize_predicate(
                                 if (need_set_runtime_filter_id) {
                                     auto* rf_expr = 
assert_cast<vectorized::VRuntimeFilterWrapper*>(
                                             conjunct_expr_root.get());
-                                    value_range.set_runtime_filter_info(
+                                    
DCHECK(rf_expr->predicate_filtered_rows_counter() != nullptr);
+                                    
DCHECK(rf_expr->predicate_input_rows_counter() != nullptr);
+                                    value_range.attach_profile_counter(
                                             rf_expr->filter_id(),
                                             
rf_expr->predicate_filtered_rows_counter(),
                                             
rf_expr->predicate_input_rows_counter());
@@ -591,8 +594,8 @@ Status 
ScanLocalState<Derived>::_normalize_in_and_eq_predicate(vectorized::VExpr
                 iter = hybrid_set->begin();
             } else {
                 int runtime_filter_id = -1;
-                RuntimeProfile::Counter* predicate_filtered_rows_counter = 
nullptr;
-                RuntimeProfile::Counter* predicate_input_rows_counter = 
nullptr;
+                std::shared_ptr<RuntimeProfile::Counter> 
predicate_filtered_rows_counter = nullptr;
+                std::shared_ptr<RuntimeProfile::Counter> 
predicate_input_rows_counter = nullptr;
                 if (expr_ctx->root()->is_rf_wrapper()) {
                     auto* rf_expr =
                             
assert_cast<vectorized::VRuntimeFilterWrapper*>(expr_ctx->root().get());
@@ -1275,7 +1278,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, rf_time);
-
+    _helper.collect_realtime_profile(profile());
     return PipelineXLocalState<>::close(state);
 }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4a8ccccb9ec..b0600320f0d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -508,10 +508,10 @@ Status RuntimeState::register_producer_runtime_filter(
 
 Status RuntimeState::register_consumer_runtime_filter(
         const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int 
node_id,
-        std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, 
RuntimeProfile* parent_profile) {
+        std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(desc, node_id, consumer_filter, 
parent_profile);
+    return mgr->register_consumer_filter(desc, node_id, consumer_filter);
 }
 
 bool RuntimeState::is_nereids() const {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index a1b3f15c0aa..6db14254a11 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -565,10 +565,9 @@ public:
                                             
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
                                             RuntimeProfile* parent_profile);
 
-    Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
-                                            bool need_local_merge, int node_id,
-                                            
std::shared_ptr<RuntimeFilterConsumer>* consumer_filter,
-                                            RuntimeProfile* parent_profile);
+    Status register_consumer_runtime_filter(
+            const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int 
node_id,
+            std::shared_ptr<RuntimeFilterConsumer>* consumer_filter);
 
     bool is_nereids() const;
 
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp 
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index 33daafca322..5a35c7ca5ad 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -17,11 +17,11 @@
 
 #include "runtime_filter/runtime_filter_consumer.h"
 
-#include "exprs/create_predicate_function.h"
+#include "exprs/minmax_predicate.h"
+#include "util/runtime_profile.h"
 #include "vec/exprs/vbitmap_predicate.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vdirect_in_predicate.h"
-#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -38,23 +38,9 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
 
     auto origin_size = push_exprs.size();
     RETURN_IF_ERROR(_get_push_exprs(push_exprs, _probe_expr));
-    // The runtime filter is pushed down, adding filtering information.
-    auto* expr_filtered_rows_counter = 
_execution_profile->add_collaboration_counter(
-            "ExprFilteredRows", TUnit::UNIT, _rf_filter);
-    auto* expr_input_rows_counter =
-            _execution_profile->add_collaboration_counter("ExprInputRows", 
TUnit::UNIT, _rf_input);
-    auto* expr_always_true_counter =
-            ADD_COUNTER(_execution_profile, "AlwaysTruePassRows", TUnit::UNIT);
-
-    auto* predicate_filtered_rows_counter = 
_storage_profile->add_collaboration_counter(
-            "PredicateFilteredRows", TUnit::UNIT, _rf_filter);
-    auto* predicate_input_rows_counter = 
_storage_profile->add_collaboration_counter(
-            "PredicateInputRows", TUnit::UNIT, _rf_input);
 
     for (auto i = origin_size; i < push_exprs.size(); i++) {
-        push_exprs[i]->attach_profile_counter(
-                expr_filtered_rows_counter, expr_input_rows_counter, 
expr_always_true_counter,
-                predicate_filtered_rows_counter, predicate_input_rows_counter);
+        push_exprs[i]->attach_profile_counter(_rf_input, _rf_filter, 
_always_true_counter);
     }
     return Status::OK();
 }
@@ -225,4 +211,33 @@ Status 
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
     return Status::OK();
 }
 
+void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* 
parent_operator_profile) {
+    DCHECK(parent_operator_profile != nullptr);
+
+    // Counter* is owned by RuntimeProfile, so no need to free.
+    RuntimeProfile::Counter* c = parent_operator_profile->add_counter(
+            fmt::format("RF{} InputRows", _filter_id), TUnit::UNIT, 
"RuntimeFilterInfo", 1);
+    c->update(_rf_input->value());
+
+    c = parent_operator_profile->add_counter(fmt::format("RF{} FilterRows", 
_filter_id),
+                                             TUnit::UNIT, "RuntimeFilterInfo", 
1);
+    c->update(_rf_filter->value());
+    c = parent_operator_profile->add_counter(fmt::format("RF{} WaitTime", 
_filter_id),
+                                             TUnit::TIME_NS, 
"RuntimeFilterInfo", 2);
+    c->update(_wait_timer->value());
+
+    c = parent_operator_profile->add_counter(fmt::format("RF{} 
AlwaysTrueFilterRows", _filter_id),
+                                             TUnit::UNIT, "RuntimeFilterInfo", 
2);
+    c->update(_always_true_counter->value());
+
+    {
+        // since debug_string will read from  RuntimeFilter::_wrapper
+        // and it is a shared_ptr, instead of a atomic_shared_ptr
+        // so it is not thread safe
+        std::unique_lock<std::mutex> l(_mtx);
+        parent_operator_profile->add_description(fmt::format("RF{} Info", 
_filter_id),
+                                                 debug_string(), 
"RuntimeFilterInfo");
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index cc6581fa9fa..eba46b98406 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
 #include <string>
 
 #include "pipeline/dependency.h"
@@ -39,12 +42,10 @@ public:
     };
 
     static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
-                         int node_id, std::shared_ptr<RuntimeFilterConsumer>* 
res,
-                         RuntimeProfile* parent_profile) {
+                         int node_id, std::shared_ptr<RuntimeFilterConsumer>* 
res) {
         *res = std::shared_ptr<RuntimeFilterConsumer>(
-                new RuntimeFilterConsumer(state, desc, node_id, 
parent_profile));
+                new RuntimeFilterConsumer(state, desc, node_id));
         RETURN_IF_ERROR((*res)->_init_with_desc(desc, 
&state->get_query_ctx()->query_options()));
-        (*res)->_profile->add_info_string("Info", ((*res)->debug_string()));
         return Status::OK();
     }
 
@@ -65,6 +66,9 @@ public:
 
     bool is_applied() { return _rf_state == State::APPLIED; }
 
+    // Called by RuntimeFilterConsumerHelper
+    void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
     static std::string to_string(const State& state) {
         switch (state) {
         case State::NOT_READY:
@@ -81,31 +85,20 @@ public:
     }
 
 private:
+    friend class RuntimeFilterProducer;
+
     RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
-                          int node_id, RuntimeProfile* parent_profile)
+                          int node_id)
             : RuntimeFilter(state, desc),
               _probe_expr(desc->planId_to_target_expr.find(node_id)->second),
-              _profile(new RuntimeProfile(fmt::format("RF{}", 
desc->filter_id))),
-              _storage_profile(new RuntimeProfile(fmt::format("Storage", 
desc->filter_id))),
-              _execution_profile(new RuntimeProfile(fmt::format("Execution", 
desc->filter_id))),
               _registration_time(MonotonicMillis()),
-              _rf_state(State::NOT_READY) {
+              _rf_state(State::NOT_READY),
+              _filter_id(desc->filter_id) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
         bool wait_infinitely = 
_state->get_query_ctx()->runtime_filter_wait_infinitely() ||
                                _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER;
         _rf_wait_time_ms = wait_infinitely ? 
_state->get_query_ctx()->execution_timeout() * 1000
                                            : 
_state->get_query_ctx()->runtime_filter_wait_time_ms();
-        _profile->add_info_string("TimeoutLimit", 
std::to_string(_rf_wait_time_ms) + "ms");
-
-        parent_profile->add_child(_profile.get(), true, nullptr);
-        _profile->add_child(_storage_profile.get(), true, nullptr);
-        _profile->add_child(_execution_profile.get(), true, nullptr);
-        _wait_timer = ADD_TIMER(_profile, "WaitTime");
-
-        _rf_filter = ADD_COUNTER_WITH_LEVEL(
-                parent_profile, fmt::format("RF{} FilterRows", 
desc->filter_id), TUnit::UNIT, 1);
-        _rf_input = ADD_COUNTER_WITH_LEVEL(
-                parent_profile, fmt::format("RF{} InputRows", 
desc->filter_id), TUnit::UNIT, 1);
         DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
     }
 
@@ -142,21 +135,25 @@ private:
             _check_state({State::NOT_READY, State::TIMEOUT});
         }
         _rf_state = rf_state;
-        _profile->add_info_string("Info", debug_string());
     }
 
     TExpr _probe_expr;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 
-    std::unique_ptr<RuntimeProfile> _profile;
-    std::unique_ptr<RuntimeProfile> _storage_profile;   // for storage layer 
stats
-    std::unique_ptr<RuntimeProfile> _execution_profile; // for execution layer 
stats
-    RuntimeProfile::Counter* _wait_timer = nullptr;
+    std::shared_ptr<RuntimeProfile::Counter> _wait_timer =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
     //_rf_filter is used to record the number of rows filtered by the runtime 
filter.
     //It aggregates the filtering statistics from both the Storage and 
Execution.
-    RuntimeProfile::Counter* _rf_filter = nullptr;
-    RuntimeProfile::Counter* _rf_input = nullptr;
+    // Counter will be shared by RuntimeFilterConsumer & VRuntimeFilterWrapper
+    // OperatorLocalState's close method will collect the statistics from 
RuntimeFilterConsumer
+    // VRuntimeFilterWrapper will update the statistics.
+    std::shared_ptr<RuntimeProfile::Counter> _rf_filter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
+    std::shared_ptr<RuntimeProfile::Counter> _rf_input =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
+    std::shared_ptr<RuntimeProfile::Counter> _always_true_counter =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
 
     int32_t _rf_wait_time_ms;
     const int64_t _registration_time;
@@ -169,6 +166,7 @@ private:
     bool _reached_timeout = false;
 
     friend class RuntimeFilterProducer;
+    int _filter_id = -1;
 };
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
index 52ceeb59c77..0ec0b2fb38a 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -19,6 +19,7 @@
 
 #include "pipeline/pipeline_task.h"
 #include "runtime_filter/runtime_filter_consumer.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -27,19 +28,16 @@ RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper(
         const RowDescriptor& row_descriptor)
         : _node_id(_node_id),
           _runtime_filter_descs(runtime_filters),
-          _row_descriptor_ref(row_descriptor),
-          _profile(new RuntimeProfile("RuntimeFilterConsumerHelper")) {
+          _row_descriptor_ref(row_descriptor) {
     _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
 }
 
 Status RuntimeFilterConsumerHelper::init(
-        RuntimeState* state, RuntimeProfile* profile, bool need_local_merge,
+        RuntimeState* state, bool need_local_merge,
         std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>& 
dependencies, const int id,
         const int node_id, const std::string& name) {
     _state = state;
-    profile->add_child(_profile.get(), true, nullptr);
     RETURN_IF_ERROR(_register_runtime_filter(need_local_merge));
-    _acquire_runtime_filter_timer = ADD_TIMER(_profile, 
"AcquireRuntimeFilterTime");
     _init_dependency(dependencies, id, node_id, name);
     return Status::OK();
 }
@@ -49,7 +47,7 @@ Status 
RuntimeFilterConsumerHelper::_register_runtime_filter(bool need_local_mer
     for (size_t i = 0; i < filter_size; ++i) {
         std::shared_ptr<RuntimeFilterConsumer> filter;
         RETURN_IF_ERROR(_state->register_consumer_runtime_filter(
-                _runtime_filter_descs[i], need_local_merge, _node_id, &filter, 
_profile.get()));
+                _runtime_filter_descs[i], need_local_merge, _node_id, 
&filter));
         _consumers.emplace_back(filter);
     }
     return Status::OK();
@@ -84,7 +82,7 @@ void RuntimeFilterConsumerHelper::_init_dependency(
 
 Status RuntimeFilterConsumerHelper::acquire_runtime_filter(
         vectorized::VExprContextSPtrs& conjuncts) {
-    SCOPED_TIMER(_acquire_runtime_filter_timer);
+    SCOPED_TIMER(_acquire_runtime_filter_timer.get());
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         RETURN_IF_ERROR(_consumers[i]->acquire_expr(vexprs));
@@ -148,4 +146,17 @@ Status 
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
     return Status::OK();
 }
 
+void RuntimeFilterConsumerHelper::collect_realtime_profile(
+        RuntimeProfile* parent_operator_profile) {
+    std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo", 
TUnit::NONE,
+                                                       
RuntimeProfile::ROOT_COUNTER, 1);
+    RuntimeProfile::Counter* c = parent_operator_profile->add_counter(
+            "AcquireRuntimeFilter", TUnit::TIME_NS, "RuntimeFilterInfo", 2);
+    c->update(_acquire_runtime_filter_timer->value());
+
+    for (auto& consumer : _consumers) {
+        consumer->collect_realtime_profile(parent_operator_profile);
+    }
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h 
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 55005b6da33..644343c431b 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -17,7 +17,10 @@
 
 #pragma once
 
+#include <mutex>
+
 #include "pipeline/dependency.h"
+#include "util/runtime_profile.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 
 namespace doris {
@@ -33,7 +36,7 @@ public:
                                 const RowDescriptor& row_descriptor);
     ~RuntimeFilterConsumerHelper() = default;
 
-    Status init(RuntimeState* state, RuntimeProfile* profile, bool 
need_local_merge,
+    Status init(RuntimeState* state, bool need_local_merge,
                 
std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
                         runtime_filter_dependencies,
                 const int id, const int node_id, const std::string& name);
@@ -46,6 +49,10 @@ public:
     Status try_append_late_arrival_runtime_filter(int* arrived_rf_num,
                                                   
vectorized::VExprContextSPtrs& conjuncts);
 
+    // Called by XXXLocalState::close()
+    // parent_operator_profile is owned by LocalState so update it is safe at 
here.
+    void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
 private:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter(bool need_local_merge);
@@ -70,8 +77,8 @@ private:
     bool _is_all_rf_applied = true;
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 
-    RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
-    std::unique_ptr<RuntimeProfile> _profile;
+    std::unique_ptr<RuntimeProfile::Counter> _acquire_runtime_filter_timer =
+            std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
 };
 #include "common/compile_check_end.h"
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 4b718620997..a5c7ab71b19 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -70,15 +70,14 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consum
     return iter->second;
 }
 
-Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc, int node_id,
-                                                  
std::shared_ptr<RuntimeFilterConsumer>* consumer,
-                                                  RuntimeProfile* 
parent_profile) {
+Status RuntimeFilterMgr::register_consumer_filter(
+        const TRuntimeFilterDesc& desc, int node_id,
+        std::shared_ptr<RuntimeFilterConsumer>* consumer) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
     std::lock_guard<std::mutex> l(_lock);
-    RETURN_IF_ERROR(
-            RuntimeFilterConsumer::create(_state, &desc, node_id, consumer, 
parent_profile));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::create(_state, &desc, node_id, 
consumer));
     _consumer_map[key].push_back(*consumer);
     return Status::OK();
 }
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index b6e2c89e820..edaf100a7f2 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -89,8 +89,7 @@ public:
     // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, int 
node_id,
-                                    std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter,
-                                    RuntimeProfile* parent_profile);
+                                    std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter);
 
     Status register_local_merger_producer_filter(
             const TRuntimeFilterDesc& desc, 
std::shared_ptr<RuntimeFilterProducer> producer_filter,
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index cf684304697..d9fb7183321 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -397,9 +397,6 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const 
std::string& name, TU
     std::lock_guard<std::mutex> l(_counter_map_lock);
 
     if (_counter_map.find(name) != _counter_map.end()) {
-        // TODO: FIX DUPLICATE COUNTERS
-        // In production, we will return the existing counter.
-        // This will not make be crash, but the result may be wrong.
         return _counter_map[name];
     }
 
@@ -434,19 +431,16 @@ RuntimeProfile::NonZeroCounter* 
RuntimeProfile::add_nonzero_counter(
     return counter;
 }
 
-RuntimeProfile::CollaborationCounter* 
RuntimeProfile::add_collaboration_counter(
-        const std::string& name, TUnit::type type, Counter* other_counter,
-        const std::string& parent_counter_name, int64_t level) {
+RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter(
+        const std::string& name, TUnit::type type, const 
DerivedCounterFunction& counter_fn,
+        const std::string& parent_counter_name) {
     std::lock_guard<std::mutex> l(_counter_map_lock);
+
     if (_counter_map.find(name) != _counter_map.end()) {
-        DCHECK(dynamic_cast<CollaborationCounter*>(_counter_map[name]));
-        return static_cast<CollaborationCounter*>(_counter_map[name]);
+        return nullptr;
     }
 
-    DCHECK(parent_counter_name == ROOT_COUNTER ||
-           _counter_map.find(parent_counter_name) != _counter_map.end());
-    CollaborationCounter* counter =
-            _pool->add(new CollaborationCounter(type, level, other_counter));
+    DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn));
     _counter_map[name] = counter;
     std::set<std::string>* child_counters =
             find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>());
@@ -454,21 +448,29 @@ RuntimeProfile::CollaborationCounter* 
RuntimeProfile::add_collaboration_counter(
     return counter;
 }
 
-RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter(
-        const std::string& name, TUnit::type type, const 
DerivedCounterFunction& counter_fn,
-        const std::string& parent_counter_name) {
+void RuntimeProfile::add_description(const std::string& name, const 
std::string& description,
+                                     std::string parent_counter_name) {
     std::lock_guard<std::mutex> l(_counter_map_lock);
 
     if (_counter_map.find(name) != _counter_map.end()) {
-        return nullptr;
+        Counter* counter = _counter_map[name];
+        if (dynamic_cast<DescriptionEntry*>(counter) != nullptr) {
+            // Do replace instead of update to avoid data race.
+            _counter_map.erase(name);
+        } else {
+            DCHECK(false) << "Counter type mismatch, name: " << name
+                          << ", type: " << counter->type() << ", description: 
" << description;
+        }
     }
 
-    DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn));
+    // Parent counter must already exist.
+    DCHECK(parent_counter_name == ROOT_COUNTER ||
+           _counter_map.find(parent_counter_name) != _counter_map.end());
+    DescriptionEntry* counter = _pool->add(new DescriptionEntry(name, 
description));
     _counter_map[name] = counter;
     std::set<std::string>* child_counters =
             find_or_insert(&_child_counter_map, parent_counter_name, 
std::set<std::string>());
     child_counters->insert(name);
-    return counter;
 }
 
 RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) {
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index cae1a7cefe7..ee9a71ee900 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -295,41 +295,36 @@ public:
         const std::string _parent_name;
     };
 
-    // When the collaboration Counter modifies itself, it also modifies the 
other counter.
-
-    class CollaborationCounter : public Counter {
+    class DescriptionEntry : public Counter {
     public:
-        CollaborationCounter(TUnit::type type, int64_t level, Counter* 
other_counter,
-                             int64_t value = 0)
-                : Counter(type, value, level), _other_counter(other_counter) {}
+        DescriptionEntry(const std::string& name, const std::string& 
description)
+                : Counter(TUnit::NONE, 0, 2), _description(description), 
_name(name) {}
 
         virtual Counter* clone() const override {
-            return new CollaborationCounter(type(), level(), _other_counter, 
value());
-        }
-
-        void update(int64_t delta) override {
-            if (_other_counter != nullptr) {
-                _other_counter->update(delta);
-            }
-            Counter::update(delta);
+            return new DescriptionEntry(_name, _description);
         }
 
         void set(int64_t value) override {
-            if (_other_counter != nullptr) {
-                _other_counter->set(value);
-            }
-            Counter::set(value);
+            // Do nothing
         }
-
         void set(double value) override {
-            if (_other_counter != nullptr) {
-                _other_counter->set(value);
-            }
-            Counter::set(value);
+            // Do nothing
+        }
+        void update(int64_t delta) override {
+            // Do nothing
+        }
+
+        TCounter to_thrift(const std::string& name) const override {
+            TCounter counter;
+            counter.name = name;
+            counter.__set_level(2);
+            counter.__set_description(_description);
+            return counter;
         }
 
     private:
-        Counter* _other_counter = nullptr; // Pointer to the other counter to 
be modified
+        const std::string _description;
+        const std::string _name;
     };
 
     // Create a runtime profile object with 'name'.
@@ -385,11 +380,9 @@ public:
             const std::string& parent_counter_name = 
RuntimeProfile::ROOT_COUNTER,
             int64_t level = 2);
 
-    CollaborationCounter* add_collaboration_counter(
-            const std::string& name, TUnit::type type, Counter* other_counter,
-            const std::string& parent_counter_name = 
RuntimeProfile::ROOT_COUNTER,
-            int64_t level = 2);
-
+    // Add a description entry under target counter.
+    void add_description(const std::string& name, const std::string& 
description,
+                         std::string parent_counter_name);
     // Add a derived counter with 'name'/'type'. The counter is owned by the
     // RuntimeProfile object.
     // If parent_counter_name is a non-empty string, the counter is added as a 
child of
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index d623a54f4f9..605cfe92c7b 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -592,9 +592,6 @@ void OlapScanner::_collect_profile_before_close() {
     COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
                    stats.short_circuit_cond_input_rows);
     COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, 
stats.expr_cond_input_rows);
-    for (const auto& [id, info] : stats.filter_info) {
-        local_state->add_filter_info(id, info);
-    }
     COUNTER_UPDATE(local_state->_stats_filtered_counter, 
stats.rows_stats_filtered);
     COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, 
stats.rows_stats_rp_filtered);
     COUNTER_UPDATE(local_state->_dict_filtered_counter, 
stats.rows_dict_filtered);
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index d864dbd833a..41d2a6d76f4 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -89,8 +89,6 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
 
 Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, 
int* result_column_id) {
     DCHECK(_open_finished || _getting_const_col);
-    DCHECK(_expr_filtered_rows_counter && _expr_input_rows_counter && 
_always_true_counter)
-            << "rf counter must be initialized";
     if (_judge_counter.fetch_sub(1) == 0) {
         reset_judge_selectivity();
     }
@@ -99,9 +97,7 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, 
Block* block, int*
         block->insert({create_always_true_column(size, 
_data_type->is_nullable()), _data_type,
                        expr_name()});
         *result_column_id = block->columns() - 1;
-        if (_always_true_counter) {
-            COUNTER_UPDATE(_always_true_counter, size);
-        }
+        COUNTER_UPDATE(_always_true_filter_rows, size);
         return Status::OK();
     } else {
         if (_getting_const_col) {
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 72627bb6cd9..84b5538e130 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -17,8 +17,11 @@
 
 #pragma once
 
+#include <gen_cpp/Metrics_types.h>
+
 #include <atomic>
 #include <cstdint>
+#include <memory>
 #include <string>
 
 #include "common/config.h"
@@ -61,21 +64,27 @@ public:
 
     VExprSPtr get_impl() const override { return _impl; }
 
-    void attach_profile_counter(RuntimeProfile::Counter* 
expr_filtered_rows_counter,
-                                RuntimeProfile::Counter* 
expr_input_rows_counter,
-                                RuntimeProfile::Counter* always_true_counter,
-                                RuntimeProfile::Counter* 
predicate_filtered_rows_counter,
-                                RuntimeProfile::Counter* 
predicate_input_rows_counter) {
-        _expr_filtered_rows_counter = expr_filtered_rows_counter;
-        _expr_input_rows_counter = expr_input_rows_counter;
-        _always_true_counter = always_true_counter;
-        _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
-        _predicate_input_rows_counter = predicate_input_rows_counter;
+    void attach_profile_counter(std::shared_ptr<RuntimeProfile::Counter> 
rf_input_rows,
+                                std::shared_ptr<RuntimeProfile::Counter> 
rf_filter_rows,
+                                std::shared_ptr<RuntimeProfile::Counter> 
always_true_filter_rows) {
+        DCHECK(rf_input_rows != nullptr);
+        DCHECK(rf_filter_rows != nullptr);
+        DCHECK(always_true_filter_rows != nullptr);
+
+        if (rf_input_rows != nullptr) {
+            _rf_input_rows = rf_input_rows;
+        }
+        if (rf_filter_rows != nullptr) {
+            _rf_filter_rows = rf_filter_rows;
+        }
+        if (always_true_filter_rows != nullptr) {
+            _always_true_filter_rows = always_true_filter_rows;
+        }
     }
 
     void update_counters(int64_t filter_rows, int64_t input_rows) {
-        COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
-        COUNTER_UPDATE(_expr_input_rows_counter, input_rows);
+        COUNTER_UPDATE(_rf_filter_rows, filter_rows);
+        COUNTER_UPDATE(_rf_input_rows, input_rows);
     }
 
     template <typename T>
@@ -100,8 +109,12 @@ public:
         }
     }
 
-    auto* predicate_filtered_rows_counter() const { return 
_predicate_filtered_rows_counter; }
-    auto* predicate_input_rows_counter() const { return 
_predicate_input_rows_counter; }
+    std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter() 
const {
+        return _rf_filter_rows;
+    }
+    std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter() 
const {
+        return _rf_input_rows;
+    }
 
 private:
     void reset_judge_selectivity() {
@@ -125,15 +138,12 @@ private:
     std::atomic_uint64_t _judge_filter_rows = 0;
     std::atomic_int _always_true = false;
 
-    RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* _expr_input_rows_counter = nullptr;
-    RuntimeProfile::Counter* _always_true_counter = nullptr;
-
-    // Used to record filtering information on predicates.
-    // The transfer relationship of these counters is:
-    // RuntimeFilterConsumer(create) ==> VRuntimeFilterWrapper(pass) ==> 
FilterOlapParam(pass) ==> ColumnPredicate(record)
-    RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+    std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+    std::shared_ptr<RuntimeProfile::Counter> _rf_filter_rows =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+    std::shared_ptr<RuntimeProfile::Counter> _always_true_filter_rows =
+            std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
 
     std::string _expr_name;
     double _ignore_thredhold;
diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
index 46a50c3f363..215de7d8358 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
@@ -78,8 +78,8 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) {
     
const_cast<std::vector<TupleDescriptor*>&>(row_desc._tuple_desc_map).push_back(&tuple_desc);
     auto helper = RuntimeFilterConsumerHelper(0, runtime_filter_descs, 
row_desc);
 
-    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.init(_runtime_states[0].get(), 
&_profile, true,
-                                                 runtime_filter_dependencies, 
0, 0, ""));
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), true, 
runtime_filter_dependencies, 0, 0, ""));
 
     vectorized::VExprContextSPtrs conjuncts;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts));
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index de6882b5aca..215edc78a30 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -30,9 +30,8 @@ class RuntimeFilterConsumerTest : public RuntimeFilterTest {
 public:
     void test_signal_aquire(TRuntimeFilterDesc desc) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                              &desc, 0, &consumer, &_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -60,11 +59,11 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
-            desc, true, 0, &registed_consumer, &_profile));
+            desc, true, 0, &registed_consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, signal_aquire_in_or_bloom) {
@@ -117,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -142,18 +141,18 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
     const_cast<TQueryOptions&>(_query_ctx->_query_options)
             .__set_runtime_filter_wait_infinitely(true);
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
-            desc, true, 0, &registed_consumer, &_profile));
+            desc, true, 0, &registed_consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -171,7 +170,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_ignored) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
     auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -193,9 +192,8 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
     std::shared_ptr<RuntimeFilterConsumer> consumer;
 
     {
-        auto st =
-                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                              &desc, 0, &consumer, &_profile);
+        auto st = RuntimeFilterConsumer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     desc.__set_src_expr(
@@ -216,17 +214,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
                     .build());
 
     {
-        auto st =
-                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                              &desc, 0, &consumer, &_profile);
+        auto st = RuntimeFilterConsumer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
0, &consumer);
         ASSERT_FALSE(st.ok());
     }
     {
         desc.__set_has_local_targets(false);
         desc.__set_has_remote_targets(true);
-        auto st =
-                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                              &desc, 0, &consumer, &_profile);
+        auto st = RuntimeFilterConsumer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
0, &consumer);
         ASSERT_FALSE(st.ok());
         desc.__set_has_local_targets(true);
         desc.__set_has_remote_targets(false);
@@ -234,16 +230,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
 
     
desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr());
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
-            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer));
 }
 
 TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
     for (int i = 0; i < 100; i++) {
         std::shared_ptr<RuntimeFilterConsumer> consumer;
         auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-                
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
-                                              &desc, 0, &consumer, &_profile));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+                RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
0, &consumer));
 
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index 253631dd45a..0e1bdea1469 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -65,8 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         // Get / Register consumer
         
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
         std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
-        EXPECT_TRUE(global_runtime_filter_mgr
-                            ->register_consumer_filter(desc, 0, 
&consumer_filter, profile.get())
+        EXPECT_TRUE(global_runtime_filter_mgr->register_consumer_filter(desc, 
0, &consumer_filter)
                             .ok());
         
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
     }
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index 8122004ed58..4a367677bf4 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -106,8 +106,8 @@ TEST_F(RuntimeFilterProducerTest, 
sync_filter_size_local_merge) {
             _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
 
     std::shared_ptr<RuntimeFilterConsumer> consumer;
-    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
-            desc, true, 0, &consumer, &_profile));
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_consumer_runtime_filter(desc, true, 
0, &consumer));
 
     ASSERT_EQ(producer->_need_sync_filter_size, true);
     ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -141,8 +141,8 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) {
             _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
 
     std::shared_ptr<RuntimeFilterConsumer> consumer;
-    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
-            desc, true, 0, &consumer, &_profile));
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_consumer_runtime_filter(desc, true, 
0, &consumer));
 
     ASSERT_EQ(producer->_need_sync_filter_size, true);
     ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -183,8 +183,8 @@ TEST_F(RuntimeFilterProducerTest, 
sync_filter_size_local_merge_with_ignored) {
             _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
 
     std::shared_ptr<RuntimeFilterConsumer> consumer;
-    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
-            desc, true, 0, &consumer, &_profile));
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_consumer_runtime_filter(desc, true, 
0, &consumer));
 
     ASSERT_EQ(producer->_need_sync_filter_size, true);
     ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
diff --git a/be/test/util/runtime_profile_counter_tree_node_test.cpp 
b/be/test/util/runtime_profile_counter_tree_node_test.cpp
index 0dcb34768aa..5226841b4dd 100644
--- a/be/test/util/runtime_profile_counter_tree_node_test.cpp
+++ b/be/test/util/runtime_profile_counter_tree_node_test.cpp
@@ -279,34 +279,70 @@ TEST_F(RuntimeProfileCounterTreeNodeTest, 
NonZeroCounterToThrfit) {
     ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 0);
 }
 
-TEST_F(RuntimeProfileCounterTreeNodeTest, CollaborationCounterTest) {
-    auto root_counter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT);
-    auto child_counter1 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(
-            TUnit::UNIT, 2, root_counter.get());
-    auto child_counter2 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(
-            TUnit::UNIT, 2, root_counter.get());
-
-    auto c1 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-                                                                     
child_counter1.get());
-    auto c2 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-                                                                     
child_counter1.get());
-    auto c3 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-                                                                     
child_counter2.get());
-    auto c4 = 
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-                                                                     
child_counter2.get());
-
-    c1->update(1);
-    c2->update(10);
-    c3->update(100);
-    c4->update(1000);
-
-    ASSERT_EQ(root_counter->value(), 1111);
-    ASSERT_EQ(child_counter1->value(), 11);
-    ASSERT_EQ(child_counter2->value(), 1100);
-    ASSERT_EQ(c1->value(), 1);
-    ASSERT_EQ(c2->value(), 10);
-    ASSERT_EQ(c3->value(), 100);
-    ASSERT_EQ(c4->value(), 1000);
+TEST_F(RuntimeProfileCounterTreeNodeTest, DescriptionCounter) {
+    RuntimeProfile::CounterMap counterMap;
+    RuntimeProfile::ChildCounterMap childCounterMap;
+    /*
+    ""
+        "root"
+            "description_entry"
+    */
+
+    auto rootCounter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT);
+    auto descriptionEntry = std::make_unique<RuntimeProfile::DescriptionEntry>(
+            "description_entry", "Updated description");
+
+    counterMap["root"] = rootCounter.get();
+    counterMap["description_entry"] = descriptionEntry.get();
+
+    childCounterMap[RuntimeProfile::ROOT_COUNTER].insert("root");
+    childCounterMap["root"].insert("description_entry");
+
+    RuntimeProfileCounterTreeNode rootNode = 
RuntimeProfileCounterTreeNode::from_map(
+            counterMap, childCounterMap, RuntimeProfile::ROOT_COUNTER);
+
+    std::vector<TCounter> tcounter;
+    std::map<std::string, std::set<std::string>> child_counter_map;
+
+    rootNode.to_thrift(tcounter, child_counter_map);
+
+    /*
+    ROOT_COUNTER
+        root
+            description_entry
+    */
+
+    /* 
+    tcounter: root, description_entry
+    child_counter_map:
+        ROOT_COUNTER -> {root}
+        root -> {description_entry}
+    */
+
+    for (const auto& counter : tcounter) {
+        std::cout << "Counter: " << counter.name;
+        if (counter.name == "description_entry") {
+            EXPECT_TRUE(counter.__isset.description);
+            EXPECT_EQ(counter.description, "Updated description");
+        }
+        if (counter.__isset.description) {
+            std::cout << ", Description: " << counter.description;
+        }
+        std::cout << std::endl;
+    }
+
+    ASSERT_EQ(tcounter.size(), 2);
+    EXPECT_EQ(tcounter[0].name, "root");
+    EXPECT_EQ(tcounter[1].name, "description_entry");
+
+    ASSERT_TRUE(tcounter[1].__isset.description);
+    EXPECT_EQ(tcounter[1].description, "Updated description");
+    EXPECT_EQ(tcounter[1].level, 2);
+
+    ASSERT_EQ(child_counter_map.size(), 2);
+    ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 1);
+    ASSERT_EQ(child_counter_map["root"].size(), 1);
+    ASSERT_EQ(*child_counter_map["root"].begin(), "description_entry");
 }
 
 } // namespace doris
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
index f306d7c73fb..d4a5e6e88dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TUnit;
 
+import com.google.common.base.Strings;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
@@ -35,6 +36,8 @@ public class Counter {
     private volatile int type;
     @SerializedName(value = "level")
     private volatile long level;
+    @SerializedName(value = "description")
+    private volatile String description;
 
     public static Counter read(DataInput input) throws IOException {
         return GsonUtils.GSON.fromJson(Text.readString(input), Counter.class);
@@ -77,6 +80,13 @@ public class Counter {
         this.level = level;
     }
 
+    public Counter(String description) {
+        this.description = description;
+        this.value = 0;
+        // Make sure not merge.
+        this.level = 2;
+    }
+
     public void addValue(Counter other) {
         if (other == null) {
             return;
@@ -115,7 +125,11 @@ public class Counter {
     }
 
     public String print() {
-        return RuntimeProfile.printCounter(value, getType());
+        if (Strings.isNullOrEmpty(description)) {
+            return RuntimeProfile.printCounter(value, getType());
+        } else {
+            return description;
+        }
     }
 
     public String toString() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index c607dc570d0..cfd253f2faa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -267,12 +267,18 @@ public class RuntimeProfile {
                 // If different node has counter with the same name, it will 
lead to chaos.
                 Counter counter = this.counterMap.get(tcounter.name);
                 if (counter == null) {
-                    counterMap.put(tcounter.name, new Counter(tcounter.type, 
tcounter.value, tcounter.level));
+                    if (tcounter.isSetDescription()) {
+                        counterMap.put(tcounter.name, new 
Counter(tcounter.description));
+                    } else {
+                        counterMap.put(tcounter.name, new 
Counter(tcounter.type, tcounter.value, tcounter.level));
+                    }
                 } else {
                     counter.setLevel(tcounter.level);
                     if (counter.getType() != tcounter.type) {
                         LOG.error("Cannot update counters with the same name 
but different types"
                                 + " type=" + tcounter.type);
+                    } else if (tcounter.isSetDescription()) {
+                        continue;
                     } else {
                         counter.setValue(tcounter.type, tcounter.value);
                     }
diff --git a/gensrc/thrift/RuntimeProfile.thrift 
b/gensrc/thrift/RuntimeProfile.thrift
index 764db39f7d2..28be3fab060 100644
--- a/gensrc/thrift/RuntimeProfile.thrift
+++ b/gensrc/thrift/RuntimeProfile.thrift
@@ -26,6 +26,7 @@ struct TCounter {
   2: required Metrics.TUnit type
   3: required i64 value 
   4: optional i64 level
+  5: optional string description
 }
 
 // A single runtime profile


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to