This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5640bdcf6d2a0534839c3cc0e77c980411764a07
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 3 19:49:50 2025 +0800

    [runtime filter](UT) test bloom filter (#48575)
---
 be/src/exprs/bloom_filter_func.h                   |   6 +-
 be/src/runtime_filter/runtime_filter_wrapper.cpp   |   5 +-
 .../runtime_filter/runtime_filter_wrapper_test.cpp | 220 +++++++++++++++++++++
 3 files changed, 224 insertions(+), 7 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 774cf42e1e6..3df8be080ad 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -45,10 +45,8 @@ public:
     Status init_with_fixed_length(size_t runtime_size) {
         if (_build_bf_by_runtime_size) {
             // Use the same algorithm as 
org.apache.doris.planner.RuntimeFilter#calculateFilterSize
-            constexpr double fpp = 0.05;
-            constexpr double k = 8; // BUCKET_WORDS
             // m is the number of bits we would need to get the fpp specified
-            double m = -k * runtime_size / std::log(1 - std::pow(fpp, 1.0 / 
k));
+            double m = -K * runtime_size / std::log(1 - std::pow(FPP, 1.0 / 
K));
 
             // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
             int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) 
/ std::log(2))));
@@ -143,6 +141,8 @@ public:
                                                 bool is_parse_column) = 0;
 
 private:
+    static constexpr double FPP = 0.05;
+    static constexpr double K = 8; // BUCKET_WORDS
     void _limit_length() {
         if (_runtime_bloom_filter_min_size > 0) {
             _bloom_filter_length = std::max(_bloom_filter_length, 
_runtime_bloom_filter_min_size);
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index a341021bc1c..09a03a10164 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -32,10 +32,7 @@ RuntimeFilterWrapper::RuntimeFilterWrapper(const 
RuntimeFilterParams* params)
     }
     // Only use in nested loop join not need set null aware
     case RuntimeFilterType::MIN_FILTER:
-    case RuntimeFilterType::MAX_FILTER: {
-        _minmax_func.reset(create_minmax_filter(_column_return_type, 
params->null_aware));
-        return;
-    }
+    case RuntimeFilterType::MAX_FILTER:
     case RuntimeFilterType::MINMAX_FILTER: {
         _minmax_func.reset(create_minmax_filter(_column_return_type, 
params->null_aware));
         return;
diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp 
b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
index d15d78f70ad..a825221a49a 100644
--- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
@@ -20,6 +20,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "exprs/bloom_filter_func.h"
 #include "exprs/hybrid_set.h"
 #include "testutil/column_helper.h"
 #include "vec/data_types/data_type_number.h"
@@ -72,6 +73,7 @@ TEST_F(RuntimeFilterWrapperTest, TestIn) {
     EXPECT_EQ(wrapper->bloom_filter_func(), nullptr);
     EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr);
     EXPECT_NE(wrapper->hybrid_set(), nullptr);
+    EXPECT_FALSE(wrapper->build_bf_by_runtime_size());
     {
         // Init
         EXPECT_TRUE(wrapper->init(2).ok());
@@ -191,6 +193,9 @@ TEST_F(RuntimeFilterWrapperTest, TestIn) {
         EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);
         EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
     }
+    EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+    EXPECT_EQ(wrapper->column_type(), column_return_type);
+    EXPECT_EQ(wrapper->contain_null(), false);
 }
 
 TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
@@ -268,4 +273,219 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
     APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6);
 }
 
+TEST_F(RuntimeFilterWrapperTest, TestBloom) {
+    std::vector<int> data_vector(10);
+    std::iota(data_vector.begin(), data_vector.end(), 0);
+    using DataType = vectorized::DataTypeInt32;
+    int32_t filter_id = 0;
+    auto runtime_size = 80;
+    RuntimeFilterType filter_type = RuntimeFilterType::BLOOM_FILTER;
+    bool null_aware = false;
+    PrimitiveType column_return_type = PrimitiveType::TYPE_INT;
+
+    int32_t max_in_num = 0;
+
+    int64_t runtime_bloom_filter_min_size = 64;
+    int64_t runtime_bloom_filter_max_size = 128;
+    bool build_bf_by_runtime_size = false;
+    int64_t bloom_filter_size = 0;
+    bool bloom_filter_size_calculated_by_ndv = true;
+    bool enable_fixed_len_to_uint32_v2 = true;
+
+    bool bitmap_filter_not_in = false;
+
+    std::shared_ptr<RuntimeFilterWrapper> wrapper;
+    {
+        bloom_filter_size = 256;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_FALSE(wrapper->build_bf_by_runtime_size());
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  runtime_bloom_filter_max_size);
+    }
+    {
+        bloom_filter_size = 32;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  runtime_bloom_filter_min_size);
+        // Init (set BF size by estimated size from FE)
+        EXPECT_TRUE(wrapper->init(80).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  runtime_bloom_filter_min_size);
+    }
+    {
+        build_bf_by_runtime_size = true;
+        bloom_filter_size = 32;
+        bloom_filter_size_calculated_by_ndv = false;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(wrapper->build_bf_by_runtime_size());
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  runtime_bloom_filter_min_size);
+        // Init (set BF size by exact size from BE)
+        double m = -BloomFilterFuncBase::K * runtime_size /
+                   std::log(1 - std::pow(BloomFilterFuncBase::FPP, 1.0 / 
BloomFilterFuncBase::K));
+        int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / 
std::log(2))));
+        auto be_calculate_size = (((int64_t)1) << log_filter_size);
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, 
be_calculate_size);
+    }
+    {
+        build_bf_by_runtime_size = true;
+        bloom_filter_size = 32;
+        bloom_filter_size_calculated_by_ndv = true;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  runtime_bloom_filter_min_size);
+        // Init (set BF size by exact size from BE)
+        auto runtime_size = 80;
+        double m = -BloomFilterFuncBase::K * runtime_size /
+                   std::log(1 - std::pow(BloomFilterFuncBase::FPP, 1.0 / 
BloomFilterFuncBase::K));
+        int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / 
std::log(2))));
+        auto be_calculate_size = (((int64_t)1) << log_filter_size);
+        // Init (set BF size by min size of exact size from BE and the 
estimated size from FE)
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length,
+                  std::min(be_calculate_size, runtime_bloom_filter_min_size));
+    }
+    {
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+
+        col = vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        std::vector<uint8_t> res(10);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+    }
+    {
+        PMergeFilterRequest valid_request;
+        valid_request.set_contain_null(false);
+        valid_request.set_filter_type(PFilterType::BLOOM_FILTER);
+        valid_request.set_filter_id(filter_id);
+        char* data = nullptr;
+        int len = 0;
+        wrapper->to_protobuf(valid_request.mutable_bloom_filter(), &data, 
&len);
+
+        const auto str = std::string(data, len);
+        butil::IOBuf io_buf;
+        io_buf.operator=(str);
+        butil::IOBufAsZeroCopyInputStream stream(io_buf);
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_TRUE(new_wrapper->assign(valid_request, &stream).ok());
+
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        std::vector<uint8_t> res(10);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+    }
+    {
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_TRUE(new_wrapper->init(runtime_size).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(new_wrapper->bloom_filter_func()->_bloom_filter_length,
+                  wrapper->bloom_filter_func()->_bloom_filter_length);
+        // Insert
+        std::vector<int> new_data_vector(10);
+        std::iota(new_data_vector.begin(), new_data_vector.end(), 10);
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        // Merge
+        std::vector<int> res_data_vector(20);
+        std::iota(res_data_vector.begin(), res_data_vector.end(), 0);
+        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+        col = 
vectorized::ColumnHelper::create_column<DataType>(res_data_vector);
+        std::vector<uint8_t> res(20);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+    }
+    EXPECT_EQ(wrapper->filter_id(), filter_id);
+    EXPECT_TRUE(wrapper->is_valid());
+    EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+    EXPECT_EQ(wrapper->column_type(), column_return_type);
+    EXPECT_EQ(wrapper->contain_null(), false);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to