This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b0b7161ad01ab69ef635ba02a53ac100ea669de7
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Mon Mar 11 10:35:56 2024 +0800

    [feature](rf) add filter info profile when rf run as expr (#31822)
---
 be/src/exprs/runtime_filter.cpp             | 20 +++++++++++++-----
 be/src/exprs/runtime_filter.h               |  3 ++-
 be/src/vec/exec/runtime_filter_consumer.cpp |  9 ++++----
 be/src/vec/exec/runtime_filter_consumer.h   |  6 +++---
 be/src/vec/exec/scan/new_olap_scan_node.cpp |  4 ++--
 be/src/vec/exprs/vruntimefilter_wrapper.cpp | 32 +++++++++++++++++++++--------
 be/src/vec/exprs/vruntimefilter_wrapper.h   | 19 ++++++++++++++---
 7 files changed, 67 insertions(+), 26 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index a949969ca65..6e4f57ef8e1 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -444,7 +444,8 @@ public:
     }
 
     Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
-                          std::vector<vectorized::VExprSPtr>& push_exprs, 
const TExpr& probe_expr);
+                          std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
+                          const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
         bool can_not_merge_in_or_bloom =
@@ -1056,14 +1057,23 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
 }
 
 Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
-                                          std::vector<vectorized::VExprSPtr>& 
push_exprs,
+                                          
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
                                           bool is_late_arrival) {
     DCHECK(is_consumer());
+    auto origin_size = push_exprs.size();
     if (!_wrapper->is_ignored()) {
         _set_push_down(!is_late_arrival);
         RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, 
_probe_expr));
     }
     _profile->add_info_string("Info", _format_status());
+    // The runtime filter is pushed down, adding filtering information.
+    auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, 
"expr_filtered_rows", TUnit::UNIT);
+    auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", 
TUnit::UNIT);
+    auto* always_true_counter = ADD_COUNTER(_profile, "always_true", 
TUnit::UNIT);
+    for (auto i = origin_size; i < push_exprs.size(); i++) {
+        push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, 
expr_input_rows_counter,
+                                              always_true_counter);
+    }
     return Status::OK();
 }
 
@@ -1715,9 +1725,9 @@ void 
IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t mer
     this->signal();
 }
 
-Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>&
 probe_ctxs,
-                                               
std::vector<vectorized::VExprSPtr>& container,
-                                               const TExpr& probe_expr) {
+Status RuntimePredicateWrapper::get_push_exprs(
+        std::list<vectorized::VExprContextSPtr>& probe_ctxs,
+        std::vector<vectorized::VRuntimeFilterPtr>& container, const TExpr& 
probe_expr) {
     vectorized::VExprContextSPtr probe_ctx;
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
probe_ctx));
     probe_ctxs.push_back(probe_ctx);
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 5cfc88f4ed8..fe5ddd68da6 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -233,7 +233,8 @@ public:
     PrimitiveType column_type() const;
 
     Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
-                              std::vector<vectorized::VExprSPtr>& push_exprs, 
bool is_late_arrival);
+                              std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
+                              bool is_late_arrival);
 
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 52caf84e361..097df801615 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -85,7 +85,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
 
 Status RuntimeFilterConsumer::_acquire_runtime_filter() {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
-    VExprSPtrs vexprs;
+    std::vector<vectorized::VRuntimeFilterPtr> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
         bool ready = runtime_filter->is_ready();
@@ -111,12 +111,13 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
     return Status::OK();
 }
 
-Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& 
vexprs) {
+Status RuntimeFilterConsumer::_append_rf_into_conjuncts(
+        const std::vector<vectorized::VRuntimeFilterPtr>& vexprs) {
     if (vexprs.empty()) {
         return Status::OK();
     }
 
-    for (auto& expr : vexprs) {
+    for (const auto& expr : vexprs) {
         VExprContextSPtr conjunct = VExprContext::create_shared(expr);
         RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
         RETURN_IF_ERROR(conjunct->open(_state));
@@ -142,7 +143,7 @@ Status 
RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
     }
 
     // 1. Check if are runtime filter ready but not applied.
-    VExprSPtrs exprs;
+    std::vector<vectorized::VRuntimeFilterPtr> exprs;
     int current_arrived_rf_num = 0;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         if (_runtime_filter_ctxs[i].apply_mark) {
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index b8513e666bc..86609624be6 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -46,7 +46,7 @@ protected:
     // Get all arrived runtime filters at Open phase.
     Status _acquire_runtime_filter();
     // Append late-arrival runtime filters to the vconjunct_ctx.
-    Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
+    Status _append_rf_into_conjuncts(const 
std::vector<vectorized::VRuntimeFilterPtr>& vexprs);
 
     void _init_profile(RuntimeProfile* profile);
 
@@ -54,9 +54,9 @@ protected:
 
     // For runtime filters
     struct RuntimeFilterContext {
-        RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), 
runtime_filter(rf) {}
+        RuntimeFilterContext(IRuntimeFilter* rf) : runtime_filter(rf) {}
         // set to true if this runtime filter is already applied to 
vconjunct_ctx_ptr
-        bool apply_mark;
+        bool apply_mark = false;
         IRuntimeFilter* runtime_filter = nullptr;
     };
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 44a435ed2a2..7473820abef 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -757,8 +757,8 @@ void NewOlapScanNode::add_filter_info(int id, const 
PredicateFilterInfo& update_
     filter_name += std::to_string(id);
     std::string info_str;
     info_str += "type = " + 
type_to_string(static_cast<PredicateType>(info.type)) + ", ";
-    info_str += "input = " + std::to_string(info.input_row) + ", ";
-    info_str += "filtered = " + std::to_string(info.filtered_row);
+    info_str += "predicate input = " + std::to_string(info.input_row) + ", ";
+    info_str += "predicate filtered = " + std::to_string(info.filtered_row);
     info_str = "[" + info_str + "]";
 
     // add info
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 075e5194866..dcf3cba72e2 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -20,9 +20,12 @@
 #include <fmt/format.h>
 #include <stddef.h>
 
+#include <cstdint>
 #include <memory>
 #include <utility>
 
+#include "util/defer_op.h"
+#include "util/runtime_profile.h"
 #include "util/simd/bits.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
@@ -84,7 +87,19 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, 
Block* block, int*
         *result_column_id = num_columns_without_result;
         return Status::OK();
     } else {
-        _scan_rows += block->rows();
+        int64_t input_rows = 0, filter_rows = 0;
+        Defer statistic_filter_info {[&]() {
+            if (_expr_filtered_rows_counter) {
+                COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
+            }
+            if (_expr_input_rows_counter) {
+                COUNTER_UPDATE(_expr_input_rows_counter, input_rows);
+            }
+            if (_always_true_counter) {
+                COUNTER_SET(_always_true_counter, (int64_t)_always_true);
+            }
+        }};
+        input_rows += block->rows();
 
         if (_getting_const_col) {
             _impl->set_getting_const_col(true);
@@ -99,28 +114,29 @@ Status VRuntimeFilterWrapper::execute(VExprContext* 
context, Block* block, int*
         if (is_column_const(*result_column.column)) {
             auto* constant_val = 
const_cast<char*>(result_column.column->get_data_at(0).data);
             if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
-                _filtered_rows += block->rows();
+                filter_rows += block->rows();
             }
         } else if (const auto* nullable =
                            
check_and_get_column<ColumnNullable>(*result_column.column)) {
             data = 
((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())
                            ->get_data()
                            .data();
-            _filtered_rows += 
doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
-                                                          
nullable->get_null_map_data().data(),
-                                                          block->rows());
+            filter_rows += doris::simd::count_zero_num(reinterpret_cast<const 
int8_t*>(data),
+                                                       
nullable->get_null_map_data().data(),
+                                                       block->rows());
         } else if (const auto* res_col =
                            
check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) {
             data = const_cast<uint8_t*>(res_col->get_data().data());
-            _filtered_rows += 
doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
-                                                          block->rows());
+            filter_rows += doris::simd::count_zero_num(reinterpret_cast<const 
int8_t*>(data),
+                                                       block->rows());
         } else {
             return Status::InternalError(
                     "Invalid type for runtime filters!, and _expr_name is: {}. 
_data_type is: {}. "
                     "result_column_id is: {}. block structure: {}.",
                     _expr_name, _data_type->get_name(), *result_column_id, 
block->dump_structure());
         }
-
+        _filtered_rows += filter_rows;
+        _scan_rows += input_rows;
         calculate_filter(VRuntimeFilterWrapper::EXPECTED_FILTER_RATE, 
_filtered_rows, _scan_rows,
                          _has_calculate_filter, _always_true);
         return Status::OK();
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 6d50b914beb..30017850028 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -17,9 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include <atomic>
+#include <cstdint>
 #include <string>
 #include <vector>
 
@@ -27,6 +26,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "udf/udf.h"
+#include "util/runtime_profile.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris {
@@ -57,6 +57,14 @@ public:
 
     const VExprSPtr get_impl() const override { return _impl; }
 
+    void attach_profile_counter(RuntimeProfile::Counter* 
expr_filtered_rows_counter,
+                                RuntimeProfile::Counter* 
expr_input_rows_counter,
+                                RuntimeProfile::Counter* always_true_counter) {
+        _expr_filtered_rows_counter = expr_filtered_rows_counter;
+        _expr_input_rows_counter = expr_input_rows_counter;
+        _always_true_counter = always_true_counter;
+    }
+
     // if filter rate less than this, bloom filter will set always true
     constexpr static double EXPECTED_FILTER_RATE = 0.4;
 
@@ -74,12 +82,17 @@ private:
     VExprSPtr _impl;
 
     bool _always_true;
-    /// TODO: statistic filter rate in the profile
     std::atomic<int64_t> _filtered_rows;
     std::atomic<int64_t> _scan_rows;
 
+    RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;
+    RuntimeProfile::Counter* _expr_input_rows_counter = nullptr;
+    RuntimeProfile::Counter* _always_true_counter = nullptr;
     bool _has_calculate_filter = false;
 
     std::string _expr_name;
 };
+
+using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;
+
 } // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to