This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpc_preview4-external2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d9a931a68bc6c8aad86b5bb3a982a8fad6a64b2b
Author: Mryange <[email protected]>
AuthorDate: Fri Dec 26 17:58:12 2025 +0800

    pick 58636
---
 be/src/common/config.cpp                           |   2 +-
 be/src/olap/column_predicate.h                     |   5 +-
 be/src/runtime_filter/runtime_filter_selectivity.h |  96 +++++++++
 be/src/vec/exprs/vexpr.cpp                         |  55 +++++
 be/src/vec/exprs/vexpr.h                           |  10 +-
 be/src/vec/exprs/vexpr_context.cpp                 |  90 +--------
 be/src/vec/exprs/vexpr_context.h                   |   9 +
 be/src/vec/exprs/vruntimefilter_wrapper.cpp        | 117 ++++++++---
 be/src/vec/exprs/vruntimefilter_wrapper.h          |  46 +----
 .../runtime_filter_selectivity_test.cpp            | 222 +++++++++++++++++++++
 10 files changed, 495 insertions(+), 157 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 29f84798fcf..03a25a6f891 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1037,7 +1037,7 @@ DEFINE_mInt64(big_column_size_buffer, "65535");
 DEFINE_mInt64(small_column_size_buffer, "100");
 
 // Perform the always_true check at intervals determined by 
runtime_filter_sampling_frequency
-DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
+DEFINE_mInt32(runtime_filter_sampling_frequency, "32");
 DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
 DEFINE_mBool(execution_ignore_eovercrowded, "true");
 // cooldown task configs
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 6e6671ff337..7162a96399d 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -25,6 +25,7 @@
 #include "olap/rowset/segment_v2/bloom_filter.h"
 #include "olap/rowset/segment_v2/inverted_index_iterator.h"
 #include "runtime/define_primitive_type.h"
+#include "runtime_filter/runtime_filter_selectivity.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
 #include "vec/columns/column.h"
@@ -372,8 +373,8 @@ protected:
         if (!_always_true) {
             _judge_filter_rows += filter_rows;
             _judge_input_rows += input_rows;
-            vectorized::VRuntimeFilterWrapper::judge_selectivity(
-                    get_ignore_threshold(), _judge_filter_rows, 
_judge_input_rows, _always_true);
+            
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(), 
_judge_filter_rows,
+                                                        _judge_input_rows, 
_always_true);
         }
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_selectivity.h 
b/be/src/runtime_filter/runtime_filter_selectivity.h
new file mode 100644
index 00000000000..1b0a82143de
--- /dev/null
+++ b/be/src/runtime_filter/runtime_filter_selectivity.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/config.h"
+#include "common/logging.h"
+
+namespace doris {
+
+// Used to track the selectivity of runtime filters
+// If the selectivity of a runtime filter is very low, it is considered 
ineffective and can be ignored
+// Considering that the selectivity of runtime filters may change with data 
variations
+// A dynamic selectivity tracking mechanism is needed
+// Note: this is not a thread-safe class
+
+class RuntimeFilterSelectivity {
+public:
+    RuntimeFilterSelectivity() = default;
+
+    RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete;
+    void update_judge_counter() {
+        if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) {
+            reset_judge_selectivity();
+        }
+    }
+
+    void update_judge_selectivity(int filter_id, uint64_t filter_rows, 
uint64_t input_rows,
+                                  double ignore_thredhold) {
+        if (!_always_true) {
+            _judge_filter_rows += filter_rows;
+            _judge_input_rows += input_rows;
+            judge_selectivity(ignore_thredhold, _judge_filter_rows, 
_judge_input_rows,
+                              _always_true);
+        }
+
+        VLOG_ROW << fmt::format(
+                "Runtime filter[{}] selectivity update: filter_rows: {}, 
input_rows: {},  filter "
+                "rate: {}, "
+                "ignore_thredhold: {}, counter: {} , always_true: {}",
+                filter_id, _judge_filter_rows, _judge_input_rows,
+                static_cast<double>(_judge_filter_rows) / 
static_cast<double>(_judge_input_rows),
+                ignore_thredhold, _judge_counter, _always_true);
+    }
+
+    bool maybe_always_true_can_ignore() const {
+        /// TODO: maybe we can use session variable to control this behavior ?
+        if (config::runtime_filter_sampling_frequency <= 0) {
+            return false;
+        } else {
+            return _always_true;
+        }
+    }
+
+    static void judge_selectivity(double ignore_threshold, int64_t 
filter_rows, int64_t input_rows,
+                                  bool& always_true) {
+        // if the judged input rows is too small, we think the selectivity is 
not reliable
+        if (input_rows > min_judge_input_rows) {
+            always_true = (static_cast<double>(filter_rows) / 
static_cast<double>(input_rows)) <
+                          ignore_threshold;
+        }
+    }
+
+private:
+    void reset_judge_selectivity() {
+        _always_true = false;
+        _judge_counter = 0;
+        _judge_input_rows = 0;
+        _judge_filter_rows = 0;
+    }
+
+    int64_t _judge_input_rows = 0;
+    int64_t _judge_filter_rows = 0;
+    int _judge_counter = 0;
+    bool _always_true = false;
+
+    constexpr static int64_t min_judge_input_rows = 4096 * 10;
+};
+
+} // namespace doris
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 1bafe01ad71..52d4ca01eac 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -1015,5 +1015,60 @@ bool VExpr::ann_dist_is_fulfilled() const {
     return _virtual_column_is_fulfilled;
 }
 
+Status VExpr::execute_filter(VExprContext* context, const Block* block,
+                             uint8_t* __restrict result_filter_data, size_t 
rows, bool accept_null,
+                             bool* can_filter_all) const {
+    ColumnPtr filter_column;
+    RETURN_IF_ERROR(execute_column(context, block, filter_column));
+    if (const auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
+        // const(nullable) or const(bool)
+        const bool result = accept_null
+                                    ? (const_column->is_null_at(0) || 
const_column->get_bool(0))
+                                    : (!const_column->is_null_at(0) && 
const_column->get_bool(0));
+        if (!result) {
+            // filter all
+            *can_filter_all = true;
+            memset(result_filter_data, 0, rows);
+            return Status::OK();
+        }
+    } else if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
+        // nullable(bool)
+        const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
+        const IColumn::Filter& filter = assert_cast<const 
ColumnUInt8&>(*nested_column).get_data();
+        const auto* __restrict filter_data = filter.data();
+        const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+
+        if (accept_null) {
+            for (size_t i = 0; i < rows; ++i) {
+                result_filter_data[i] &= (null_map_data[i]) || filter_data[i];
+            }
+        } else {
+            for (size_t i = 0; i < rows; ++i) {
+                result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
+            }
+        }
+
+        if ((memchr(result_filter_data, 0x1, rows) == nullptr)) {
+            *can_filter_all = true;
+            return Status::OK();
+        }
+    } else {
+        // bool
+        const IColumn::Filter& filter = assert_cast<const 
ColumnUInt8&>(*filter_column).get_data();
+        const auto* __restrict filter_data = filter.data();
+
+        for (size_t i = 0; i < rows; ++i) {
+            result_filter_data[i] &= filter_data[i];
+        }
+
+        if (memchr(result_filter_data, 0x1, rows) == nullptr) {
+            *can_filter_all = true;
+            return Status::OK();
+        }
+    }
+
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 35a0d3733b0..2a0abe439f9 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -147,6 +147,10 @@ public:
     // Therefore we need a function like this to return the actual type 
produced by execution.
     virtual DataTypePtr execute_type(const Block* block) const { return 
_data_type; }
 
+    virtual Status execute_filter(VExprContext* context, const Block* block,
+                                  uint8_t* __restrict result_filter_data, 
size_t rows,
+                                  bool accept_null, bool* can_filter_all) 
const;
+
     // `is_blockable` means this expr will be blocked in `execute` (e.g. AI 
Function, Remote Function)
     [[nodiscard]] virtual bool is_blockable() const {
         return std::any_of(_children.begin(), _children.end(),
@@ -204,12 +208,6 @@ public:
                                    [](VExprSPtr child) { return 
child->is_rf_wrapper(); });
     }
 
-    virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t 
input_rows) {
-        for (auto child : _children) {
-            child->do_judge_selectivity(filter_rows, input_rows);
-        }
-    }
-
     static Status create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx);
 
     static Status create_expr_trees(const std::vector<TExpr>& texprs, 
VExprContextSPtrs& ctxs);
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index a7b71b77646..2a9c049e303 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -199,7 +199,12 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
     return execute_conjuncts(ctxs, filters, false, block, result_filter, 
can_filter_all);
 }
 
-// TODO: Performance Optimization
+Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict 
result_filter_data,
+                                    size_t rows, bool accept_null, bool* 
can_filter_all) {
+    return _root->execute_filter(this, block, result_filter_data, rows, 
accept_null,
+                                 can_filter_all);
+}
+
 Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
                                        const std::vector<IColumn::Filter*>* 
filters,
                                        bool accept_null, const Block* block,
@@ -209,85 +214,10 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
     *can_filter_all = false;
     auto* __restrict result_filter_data = result_filter->data();
     for (const auto& ctx : ctxs) {
-        // Statistics are only required when an rf wrapper exists in the expr.
-        bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
-        ColumnPtr filter_column;
-        RETURN_IF_ERROR(ctx->execute(block, filter_column));
-        if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
-            size_t column_size = nullable_column->size();
-            if (column_size == 0) {
-                *can_filter_all = true;
-                return Status::OK();
-            } else {
-                const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
-                const IColumn::Filter& filter =
-                        assert_cast<const 
ColumnUInt8&>(*nested_column).get_data();
-                const auto* __restrict filter_data = filter.data();
-                const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
-
-                size_t input_rows =
-                        rows - (is_rf_wrapper
-                                        ? 
simd::count_zero_num((int8_t*)result_filter_data, rows)
-                                        : 0);
-
-                if (accept_null) {
-                    for (size_t i = 0; i < rows; ++i) {
-                        result_filter_data[i] &= (null_map_data[i]) || 
filter_data[i];
-                    }
-                } else {
-                    for (size_t i = 0; i < rows; ++i) {
-                        result_filter_data[i] &= (!null_map_data[i]) & 
filter_data[i];
-                    }
-                }
-
-                size_t output_rows =
-                        rows - (is_rf_wrapper
-                                        ? 
simd::count_zero_num((int8_t*)result_filter_data, rows)
-                                        : 0);
-
-                if (is_rf_wrapper) {
-                    ctx->root()->do_judge_selectivity(input_rows - 
output_rows, input_rows);
-                }
-
-                if ((is_rf_wrapper && output_rows == 0) ||
-                    (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) 
== nullptr)) {
-                    *can_filter_all = true;
-                    return Status::OK();
-                }
-            }
-        } else if (const auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
-            // filter all
-            if (!const_column->get_bool(0)) {
-                *can_filter_all = true;
-                memset(result_filter_data, 0, result_filter->size());
-                return Status::OK();
-            }
-        } else {
-            const IColumn::Filter& filter =
-                    assert_cast<const ColumnUInt8&>(*filter_column).get_data();
-            const auto* __restrict filter_data = filter.data();
-
-            size_t input_rows =
-                    rows -
-                    (is_rf_wrapper ? 
simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
-
-            for (size_t i = 0; i < rows; ++i) {
-                result_filter_data[i] &= filter_data[i];
-            }
-
-            size_t output_rows =
-                    rows -
-                    (is_rf_wrapper ? 
simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
-
-            if (is_rf_wrapper) {
-                ctx->root()->do_judge_selectivity(input_rows - output_rows, 
input_rows);
-            }
-
-            if ((is_rf_wrapper && output_rows == 0) ||
-                (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == 
nullptr)) {
-                *can_filter_all = true;
-                return Status::OK();
-            }
+        RETURN_IF_ERROR(
+                ctx->execute_filter(block, result_filter_data, rows, 
accept_null, can_filter_all));
+        if (*can_filter_all) {
+            return Status::OK();
         }
     }
     if (filters != nullptr) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 3179526ec54..349f199af23 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -33,6 +33,7 @@
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
+#include "runtime_filter/runtime_filter_selectivity.h"
 #include "udf/udf.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
@@ -210,6 +211,9 @@ public:
 
     bool all_expr_inverted_index_evaluated();
 
+    Status execute_filter(const Block* block, uint8_t* __restrict 
result_filter_data, size_t rows,
+                          bool accept_null, bool* can_filter_all);
+
     [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* 
block);
 
     [[nodiscard]] static Status filter_block(const VExprContextSPtrs& 
expr_contexts, Block* block,
@@ -246,6 +250,8 @@ public:
         return _last_result_column_id;
     }
 
+    RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return 
*_rf_selectivity; }
+
     FunctionContext::FunctionStateScope get_function_state_scope() const {
         return _is_clone ? FunctionContext::THREAD_LOCAL : 
FunctionContext::FRAGMENT_LOCAL;
     }
@@ -337,5 +343,8 @@ private:
 
     segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime;
     bool _suitable_for_ann_index = true;
+
+    std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity =
+            std::make_unique<RuntimeFilterSelectivity>();
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 8e915ffff67..b24df4860da 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -62,9 +62,7 @@ VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& 
node, VExprSPtr im
           _impl(std::move(impl)),
           _ignore_thredhold(ignore_thredhold),
           _null_aware(null_aware),
-          _filter_id(filter_id) {
-    reset_judge_selectivity();
-}
+          _filter_id(filter_id) {}
 
 Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                                       VExprContext* context) {
@@ -89,38 +87,105 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
 
 Status VRuntimeFilterWrapper::execute_column(VExprContext* context, const 
Block* block,
                                              ColumnPtr& result_column) const {
-    DCHECK(_open_finished || _getting_const_col);
-    if (_judge_counter.fetch_sub(1) == 0) {
-        reset_judge_selectivity();
+    return Status::InternalError("Not implement 
VRuntimeFilterWrapper::execute_column");
+}
+
+const std::string& VRuntimeFilterWrapper::expr_name() const {
+    return _expr_name;
+}
+
+Status VRuntimeFilterWrapper::execute_filter(VExprContext* context, const 
Block* block,
+                                             uint8_t* __restrict 
result_filter_data, size_t rows,
+                                             bool accept_null, bool* 
can_filter_all) const {
+    DCHECK(_open_finished);
+    if (accept_null) {
+        return Status::InternalError(
+                "Runtime filter does not support accept_null in 
execute_filter");
     }
-    if (_always_true) {
-        size_t size = block->rows();
-        result_column = create_always_true_column(size, 
_data_type->is_nullable());
-        COUNTER_UPDATE(_always_true_filter_rows, size);
+
+    auto& rf_selectivity = context->get_runtime_filter_selectivity();
+    Defer auto_update_judge_counter = [&]() { 
rf_selectivity.update_judge_counter(); };
+
+    // if always true, skip evaluate runtime filter
+    if (rf_selectivity.maybe_always_true_can_ignore()) {
+        COUNTER_UPDATE(_always_true_filter_rows, rows);
         return Status::OK();
-    } else {
-        if (_getting_const_col) {
-            _impl->set_getting_const_col(true);
+    }
+
+    ColumnPtr filter_column;
+    ColumnPtr arg_column = nullptr;
+    RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, 
filter_column, &arg_column));
+
+    // bloom filter will handle null aware inside itself
+    if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) {
+        DCHECK(arg_column);
+        change_null_to_true(filter_column->assume_mutable(), arg_column);
+    }
+
+    if (const auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
+        // const(nullable) or const(bool)
+        if (!const_column->get_bool(0)) {
+            // filter all
+            COUNTER_UPDATE(_rf_filter_rows, rows);
+            COUNTER_UPDATE(_rf_input_rows, rows);
+            rf_selectivity.update_judge_selectivity(_filter_id, rows, rows, 
_ignore_thredhold);
+            *can_filter_all = true;
+            memset(result_filter_data, 0, rows);
+            return Status::OK();
+        } else {
+            // filter none
+            COUNTER_UPDATE(_rf_input_rows, rows);
+            rf_selectivity.update_judge_selectivity(_filter_id, 0, rows, 
_ignore_thredhold);
+            return Status::OK();
         }
+    } else if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
+        // nullable(bool)
+        const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
+        const IColumn::Filter& filter = assert_cast<const 
ColumnUInt8&>(*nested_column).get_data();
+        const auto* __restrict filter_data = filter.data();
+        const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+
+        const size_t input_rows = rows - 
simd::count_zero_num((int8_t*)result_filter_data, rows);
 
-        ColumnPtr arg_column = nullptr;
-        RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block, 
result_column, &arg_column));
-        if (_getting_const_col) {
-            _impl->set_getting_const_col(false);
+        for (size_t i = 0; i < rows; ++i) {
+            result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
         }
 
-        // bloom filter will handle null aware inside itself
-        if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) {
-            DCHECK(arg_column);
-            change_null_to_true(result_column->assume_mutable(), arg_column);
+        const size_t output_rows = rows - 
simd::count_zero_num((int8_t*)result_filter_data, rows);
+
+        COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows);
+        COUNTER_UPDATE(_rf_input_rows, input_rows);
+        rf_selectivity.update_judge_selectivity(_filter_id, input_rows - 
output_rows, input_rows,
+                                                _ignore_thredhold);
+
+        if (output_rows == 0) {
+            *can_filter_all = true;
+            return Status::OK();
         }
+    } else {
+        // bool
+        const IColumn::Filter& filter = assert_cast<const 
ColumnUInt8&>(*filter_column).get_data();
+        const auto* __restrict filter_data = filter.data();
 
-        return Status::OK();
-    }
-}
+        const size_t input_rows = rows - 
simd::count_zero_num((int8_t*)result_filter_data, rows);
 
-const std::string& VRuntimeFilterWrapper::expr_name() const {
-    return _expr_name;
+        for (size_t i = 0; i < rows; ++i) {
+            result_filter_data[i] &= filter_data[i];
+        }
+
+        const size_t output_rows = rows - 
simd::count_zero_num((int8_t*)result_filter_data, rows);
+
+        COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows);
+        COUNTER_UPDATE(_rf_input_rows, input_rows);
+        rf_selectivity.update_judge_selectivity(_filter_id, input_rows - 
output_rows, input_rows,
+                                                _ignore_thredhold);
+
+        if (output_rows == 0) {
+            *can_filter_all = true;
+            return Status::OK();
+        }
+    }
+    return Status::OK();
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 3535898915b..09bc8a815c7 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -63,6 +63,10 @@ public:
     const std::string& expr_name() const override;
     const VExprSPtrs& children() const override { return _impl->children(); }
 
+    Status execute_filter(VExprContext* context, const Block* block,
+                          uint8_t* __restrict result_filter_data, size_t rows, 
bool accept_null,
+                          bool* can_filter_all) const override;
+
     uint64_t get_digest(uint64_t seed) const override {
         seed = _impl->get_digest(seed);
         if (seed) {
@@ -91,33 +95,10 @@ public:
         }
     }
 
-    void update_counters(int64_t filter_rows, int64_t input_rows) {
-        COUNTER_UPDATE(_rf_filter_rows, filter_rows);
-        COUNTER_UPDATE(_rf_input_rows, input_rows);
-    }
-
-    template <typename T>
-    static void judge_selectivity(double ignore_threshold, int64_t 
filter_rows, int64_t input_rows,
-                                  T& always_true) {
-        always_true = static_cast<double>(filter_rows) / 
static_cast<double>(input_rows) <
-                      ignore_threshold;
-    }
-
     bool is_rf_wrapper() const override { return true; }
 
     int filter_id() const { return _filter_id; }
 
-    void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) 
override {
-        update_counters(filter_rows, input_rows);
-
-        if (!_always_true) {
-            _judge_filter_rows += filter_rows;
-            _judge_input_rows += input_rows;
-            judge_selectivity(_ignore_thredhold, _judge_filter_rows, 
_judge_input_rows,
-                              _always_true);
-        }
-    }
-
     std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter() 
const {
         return _rf_filter_rows;
     }
@@ -129,26 +110,7 @@ public:
     }
 
 private:
-    void reset_judge_selectivity() const {
-        _always_true = false;
-        _judge_counter = config::runtime_filter_sampling_frequency;
-        _judge_input_rows = 0;
-        _judge_filter_rows = 0;
-    }
-
     VExprSPtr _impl;
-    // VRuntimeFilterWrapper and ColumnPredicate share the same logic,
-    // but it's challenging to unify them, so the code is duplicated.
-    // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
-    // are variables used to implement the _always_true logic, calculated 
periodically
-    // based on runtime_filter_sampling_frequency. During each period, if 
_always_true
-    // is evaluated as true, the logic for always_true is applied for the rest 
of that period
-    // without recalculating. At the beginning of the next period,
-    // reset_judge_selectivity is used to reset these variables.
-    mutable std::atomic_int _judge_counter = 0;
-    mutable std::atomic_uint64_t _judge_input_rows = 0;
-    mutable std::atomic_uint64_t _judge_filter_rows = 0;
-    mutable std::atomic_int _always_true = false;
 
     std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows =
             std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
diff --git a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp 
b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
new file mode 100644
index 00000000000..b8504f950c2
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_selectivity.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class RuntimeFilterSelectivityTest : public testing::Test {
+protected:
+    void SetUp() override {
+        // Save original config value
+        _original_sampling_frequency = 
config::runtime_filter_sampling_frequency;
+    }
+
+    void TearDown() override {
+        // Restore original config value
+        config::runtime_filter_sampling_frequency = 
_original_sampling_frequency;
+    }
+
+    int _original_sampling_frequency;
+};
+
+TEST_F(RuntimeFilterSelectivityTest, basic_initialization) {
+    RuntimeFilterSelectivity selectivity;
+    // Initially should be false (not always_true)
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, disabled_sampling_frequency) {
+    RuntimeFilterSelectivity selectivity;
+    config::runtime_filter_sampling_frequency = 0;
+
+    // Even if conditions are met, should return false when sampling is 
disabled
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, negative_sampling_frequency) {
+    RuntimeFilterSelectivity selectivity;
+    config::runtime_filter_sampling_frequency = -1;
+
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_below_threshold) {
+    bool always_true = false;
+    // filter_rows/input_rows = 5/50000 = 0.0001 < 0.1
+    // input_rows (50000) > min_judge_input_rows (40960)
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 50000, always_true);
+    EXPECT_TRUE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_above_threshold) {
+    bool always_true = false;
+    // filter_rows/input_rows = 25000/50000 = 0.5 >= 0.1
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 25000, 50000, 
always_true);
+    EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, 
judge_selectivity_insufficient_input_rows) {
+    bool always_true = false;
+    // Even though 5/100 = 0.05 < 0.1, input_rows (100) < min_judge_input_rows 
(40960)
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 100, always_true);
+    EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, update_with_low_selectivity) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // filter_rows/input_rows = 2000/50000 = 0.04 < 0.1
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, update_with_high_selectivity) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // filter_rows/input_rows = 45000/50000 = 0.9 >= 0.1
+    selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, once_always_true_stays_true) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // First update: low selectivity
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+    // Second update: high selectivity, but should be ignored
+    selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_low) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // First update: 1000/50000 = 0.02
+    selectivity.update_judge_selectivity(-1, 1000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_high) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // First update: 20000/50000 = 0.4
+    selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+    // Second update: accumulated (20000+20000)/(50000+50000) = 0.4
+    selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, counter_triggers_reset) {
+    config::runtime_filter_sampling_frequency = 3;
+    RuntimeFilterSelectivity selectivity;
+
+    // Mark as always_true
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+    // Update counter to trigger reset
+    selectivity.update_judge_counter(); // counter = 1
+    selectivity.update_judge_counter(); // counter = 2
+    selectivity.update_judge_counter(); // counter = 3, triggers reset
+
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, reset_allows_reevaluation) {
+    config::runtime_filter_sampling_frequency = 2;
+    RuntimeFilterSelectivity selectivity;
+
+    // First cycle: mark as always_true
+    selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+    // Trigger reset
+    selectivity.update_judge_counter(); // counter = 1
+    selectivity.update_judge_counter(); // counter = 2, triggers reset
+
+    // Second cycle: now with high selectivity
+    selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, edge_case_zero_rows) {
+    bool always_true = false;
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 0, 0, always_true);
+    EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, edge_case_exact_threshold) {
+    bool always_true = false;
+    // Exactly at threshold: 5000/50000 = 0.1, NOT less than 0.1
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 5000, 50000, always_true);
+    EXPECT_FALSE(always_true);
+
+    // Just below threshold: 4999/50000 = 0.09998 < 0.1
+    RuntimeFilterSelectivity::judge_selectivity(0.1, 4999, 50000, always_true);
+    EXPECT_TRUE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, multiple_updates_before_threshold) {
+    config::runtime_filter_sampling_frequency = 100;
+    RuntimeFilterSelectivity selectivity;
+
+    // Multiple updates with insufficient rows each time
+    selectivity.update_judge_selectivity(-1, 100, 1000, 0.1); // 100/1000, 
insufficient
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+    selectivity.update_judge_selectivity(-1, 200, 2000, 0.1); // 300/3000, 
insufficient
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+    // Now accumulated rows are sufficient: 300+2000 = 2300, 3000+40000 = 43000
+    selectivity.update_judge_selectivity(-1, 2000, 40000, 0.1); // 2300/43000 
= 0.053 < 0.1
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
+    config::runtime_filter_sampling_frequency = 100;
+
+    // Test with threshold 0.05
+    {
+        RuntimeFilterSelectivity selectivity;
+        selectivity.update_judge_selectivity(-1, 2000, 50000, 0.05); // 0.04 < 
0.05
+        EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+    }
+
+    // Test with threshold 0.03
+    {
+        RuntimeFilterSelectivity selectivity;
+        selectivity.update_judge_selectivity(-1, 2000, 50000, 0.03); // 0.04 
>= 0.03
+        EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+    }
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to