This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit aacd25e62dd4c0a09d61e89abaee44d27b9bc563
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Tue Mar 4 11:16:38 2025 +0800

    test in or bloom (#48596)
---
 be/src/runtime_filter/runtime_filter.h             |  14 +-
 be/src/runtime_filter/runtime_filter_wrapper.cpp   |  33 +-
 be/src/runtime_filter/runtime_filter_wrapper.h     |   6 +-
 .../runtime_filter/runtime_filter_wrapper_test.cpp | 779 +++++++++++++++++++--
 4 files changed, 751 insertions(+), 81 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index 91883bbcec0..dd63bf9379d 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -66,15 +66,15 @@ public:
 
         if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) {
             auto in_filter = request->mutable_in_filter();
-            _to_protobuf(in_filter);
+            RETURN_IF_ERROR(_to_protobuf(in_filter));
         } else if (real_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER) {
             DCHECK(data != nullptr);
-            _to_protobuf(request->mutable_bloom_filter(), (char**)data, len);
+            RETURN_IF_ERROR(_to_protobuf(request->mutable_bloom_filter(), 
(char**)data, len));
         } else if (real_runtime_filter_type == 
RuntimeFilterType::MINMAX_FILTER ||
                    real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
                    real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
             auto minmax_filter = request->mutable_minmax_filter();
-            _to_protobuf(minmax_filter);
+            RETURN_IF_ERROR(_to_protobuf(minmax_filter));
         } else {
             return Status::InternalError("not implemented !");
         }
@@ -95,11 +95,11 @@ protected:
     virtual Status _init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options);
 
     template <typename T>
-    void _to_protobuf(T* filter) {
-        _wrapper->to_protobuf(filter);
+    Status _to_protobuf(T* filter) {
+        return _wrapper->to_protobuf(filter);
     }
-    void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) {
-        _wrapper->to_protobuf(filter, data, filter_length);
+    Status _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) 
{
+        return _wrapper->to_protobuf(filter, data, filter_length);
     }
 
     Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index 9aa054c0ec9..8204cc0dd37 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -64,8 +64,6 @@ Status RuntimeFilterWrapper::_change_to_bloom_filter() {
     }
     if (_bloom_filter_func->get_size() != 0) {
         _bloom_filter_func->insert_set(_hybrid_set);
-    } else if (_hybrid_set != nullptr && _hybrid_set->size() != 0) {
-        return Status::InternalError("change to bloom filter need empty set, 
{}", debug_string());
     }
 
     // release in filter to change real type to bloom
@@ -142,8 +140,8 @@ Status RuntimeFilterWrapper::insert(const 
vectorized::ColumnPtr& column, size_t
         break;
     }
     default:
-        DCHECK(false);
-        break;
+        return Status::InternalError("`RuntimeFilterWrapper::insert` is not 
supported for RF {}",
+                                     int(_filter_type));
     }
     return Status::OK();
 }
@@ -570,10 +568,6 @@ bool RuntimeFilterWrapper::contain_null() const {
         return _bloom_filter_func->contain_null();
     }
     if (_hybrid_set) {
-        if (get_real_type() != RuntimeFilterType::IN_FILTER) {
-            throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set but 
real type is {}",
-                            int(get_real_type()));
-        }
         return _hybrid_set->contain_null();
     }
     if (_minmax_func) {
@@ -611,24 +605,41 @@ std::string RuntimeFilterWrapper::debug_string() const {
     return result + "]";
 }
 
-void RuntimeFilterWrapper::to_protobuf(PInFilter* filter) {
+Status RuntimeFilterWrapper::to_protobuf(PInFilter* filter) {
+    if (get_real_type() != RuntimeFilterType::IN_FILTER) {
+        return Status::InternalError("Runtime filter {} cannot serialize to 
PInFilter",
+                                     int(get_real_type()));
+    }
     filter->set_column_type(
             PColumnType::
                     COLUMN_TYPE_BOOL); // set deprecated field coz it is 
required and we can't delete it
     _hybrid_set->to_pb(filter);
+    return Status::OK();
 }
 
-void RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) {
+Status RuntimeFilterWrapper::to_protobuf(PMinMaxFilter* filter) {
+    if (get_real_type() != RuntimeFilterType::MINMAX_FILTER &&
+        get_real_type() != RuntimeFilterType::MIN_FILTER &&
+        get_real_type() != RuntimeFilterType::MAX_FILTER) {
+        return Status::InternalError("Runtime filter {} cannot serialize to 
PMinMaxFilter",
+                                     int(get_real_type()));
+    }
     filter->set_column_type(
             PColumnType::
                     COLUMN_TYPE_BOOL); // set deprecated field coz it is 
required and we can't delete it
     _minmax_func->to_pb(filter);
+    return Status::OK();
 }
 
-void RuntimeFilterWrapper::to_protobuf(PBloomFilter* filter, char** data, int* 
filter_length) {
+Status RuntimeFilterWrapper::to_protobuf(PBloomFilter* filter, char** data, 
int* filter_length) {
+    if (get_real_type() != RuntimeFilterType::BLOOM_FILTER) {
+        return Status::InternalError("Runtime filter {} cannot serialize to 
PBloomFilter",
+                                     int(get_real_type()));
+    }
     _bloom_filter_func->get_data(data, filter_length);
     filter->set_filter_length(*filter_length);
     filter->set_always_true(false);
+    return Status::OK();
 }
 
 template <class T>
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 5695d9328d7..7cee88fe666 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -76,9 +76,9 @@ public:
     std::shared_ptr<BloomFilterFuncBase> bloom_filter_func() const { return 
_bloom_filter_func; }
     std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func() const { return 
_bitmap_filter_func; }
 
-    void to_protobuf(PInFilter* filter);
-    void to_protobuf(PMinMaxFilter* filter);
-    void to_protobuf(PBloomFilter* filter, char** data, int* filter_length);
+    Status to_protobuf(PInFilter* filter);
+    Status to_protobuf(PMinMaxFilter* filter);
+    Status to_protobuf(PBloomFilter* filter, char** data, int* filter_length);
 
     PrimitiveType column_type() const { return _column_return_type; }
 
diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp 
b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
index 9d6e5f410e3..7511015c722 100644
--- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp
@@ -113,7 +113,7 @@ TEST_F(RuntimeFilterWrapperTest, TestIn) {
         EXPECT_TRUE(wrapper->merge(another_wrapper.get()).ok());
         EXPECT_EQ(wrapper->hybrid_set()->size(), 2);
         EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);
-        wrapper->to_protobuf(valid_request.mutable_in_filter());
+        
EXPECT_TRUE(wrapper->to_protobuf(valid_request.mutable_in_filter()).ok());
 
         // Merge 2 (ignored filter)
         another_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
@@ -241,14 +241,16 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
                                                                           
value1);                 \
         
get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(in_filter->add_values(),
 \
                                                                           
value2);                 \
-        valid_request.set_contain_null(false);                                 
                    \
+        valid_request.set_contain_null(true);                                  
                    \
         valid_request.set_filter_type(PFilterType::IN_FILTER);                 
                    \
         EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok());             
                    \
         EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);   
                    \
         EXPECT_EQ(wrapper->hybrid_set()->size(), 2);                           
                    \
     }
 
-#define APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE) APPLY_FOR_PRIMITIVE_TYPE(TYPE, 0, 
1)
+#define APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE)                                    
           \
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE, 
type_limit<PrimitiveTypeTraits<TYPE>::CppType>::min(), \
+                             
type_limit<PrimitiveTypeTraits<TYPE>::CppType>::max())
 
     APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BOOLEAN);
     APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_TINYINT);
@@ -274,6 +276,92 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) {
     APPLY_FOR_PRIMITIVE_TYPE(TYPE_STRING, StringRef("1"), StringRef("2"));
     APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV4);
     APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6);
+#undef APPLY_FOR_PRIMITIVE_TYPE
+#undef APPLY_FOR_PRIMITIVE_BASE_TYPE
+}
+
+TEST_F(RuntimeFilterWrapperTest, TestMinMaxAssign) {
+    int32_t filter_id = 0;
+    RuntimeFilterType filter_type = RuntimeFilterType::MINMAX_FILTER;
+    bool null_aware = true;
+
+    int32_t max_in_num = 2;
+
+    int64_t runtime_bloom_filter_min_size = 0;
+    int64_t runtime_bloom_filter_max_size = 0;
+    bool build_bf_by_runtime_size = true;
+    int64_t bloom_filter_size = 0;
+    bool bloom_filter_size_calculated_by_ndv = true;
+    bool enable_fixed_len_to_uint32_v2 = true;
+
+    bool bitmap_filter_not_in = false;
+
+#define APPLY_FOR_PRIMITIVE_TYPE(TYPE, value1, value2)                         
              \
+    {                                                                          
              \
+        static constexpr PrimitiveType column_return_type = 
PrimitiveType::TYPE;             \
+        RuntimeFilterParams params {                                           
              \
+                .filter_id = filter_id,                                        
              \
+                .filter_type = filter_type,                                    
              \
+                .column_return_type = column_return_type,                      
              \
+                .null_aware = null_aware,                                      
              \
+                .max_in_num = max_in_num,                                      
              \
+                .runtime_bloom_filter_min_size = 
runtime_bloom_filter_min_size,              \
+                .runtime_bloom_filter_max_size = 
runtime_bloom_filter_max_size,              \
+                .bloom_filter_size = bloom_filter_size,                        
              \
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,          
              \
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,  \
+                .enable_fixed_len_to_uint32_v2 = 
enable_fixed_len_to_uint32_v2,              \
+                .bitmap_filter_not_in = bitmap_filter_not_in};                 
              \
+        auto wrapper = std::make_shared<RuntimeFilterWrapper>(&params);        
              \
+        PMergeFilterRequest valid_request;                                     
              \
+        auto* minmax_filter = valid_request.mutable_minmax_filter();           
              \
+        minmax_filter->set_column_type(PColumnType::COLUMN_TYPE_BOOL);         
              \
+        get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(     
              \
+                minmax_filter->mutable_min_val(), value1);                     
              \
+        get_convertor<PrimitiveTypeTraits<column_return_type>::CppType>()(     
              \
+                minmax_filter->mutable_max_val(), value2);                     
              \
+        valid_request.set_contain_null(true);                                  
              \
+        valid_request.set_filter_type(PFilterType::MINMAX_FILTER);             
              \
+        EXPECT_TRUE(wrapper->assign(valid_request, nullptr).ok());             
              \
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY);   
              \
+        
EXPECT_EQ(*(PrimitiveTypeTraits<column_return_type>::CppType*)wrapper->minmax_func()
 \
+                           ->get_min(),                                        
              \
+                  value1);                                                     
              \
+        
EXPECT_EQ(*(PrimitiveTypeTraits<column_return_type>::CppType*)wrapper->minmax_func()
 \
+                           ->get_max(),                                        
              \
+                  value2);                                                     
              \
+    }
+
+#define APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE)                                    
           \
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE, 
type_limit<PrimitiveTypeTraits<TYPE>::CppType>::min(), \
+                             
type_limit<PrimitiveTypeTraits<TYPE>::CppType>::max())
+
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BOOLEAN);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_TINYINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_SMALLINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_INT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_BIGINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_LARGEINT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_FLOAT);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DOUBLE);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATEV2);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DATETIMEV2);
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATETIME, VecDateTimeValue(0, 3, 0, 0, 0, 
2020, 1, 1),
+                             VecDateTimeValue(0, 3, 0, 0, 0, 2020, 1, 2));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DATE, VecDateTimeValue(0, 2, 0, 0, 0, 2020, 
1, 1),
+                             VecDateTimeValue(0, 2, 0, 0, 0, 2020, 1, 2));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMALV2, DecimalV2Value(1, 1), 
DecimalV2Value(1, 2));
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL32);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL64);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_DECIMAL128I);
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_DECIMAL256, vectorized::Decimal256(0), 
vectorized::Decimal256(1));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_VARCHAR, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_CHAR, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_TYPE(TYPE_STRING, StringRef("1"), StringRef("2"));
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV4);
+    APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6);
+#undef APPLY_FOR_PRIMITIVE_TYPE
+#undef APPLY_FOR_PRIMITIVE_BASE_TYPE
 }
 
 TEST_F(RuntimeFilterWrapperTest, TestBloom) {
@@ -422,7 +510,7 @@ TEST_F(RuntimeFilterWrapperTest, TestBloom) {
         valid_request.set_filter_id(filter_id);
         char* data = nullptr;
         int len = 0;
-        wrapper->to_protobuf(valid_request.mutable_bloom_filter(), &data, 
&len);
+        EXPECT_TRUE(wrapper->to_protobuf(valid_request.mutable_bloom_filter(), 
&data, &len).ok());
 
         const auto str = std::string(data, len);
         butil::IOBuf io_buf;
@@ -513,71 +601,203 @@ TEST_F(RuntimeFilterWrapperTest, TestMinMax) {
 
     bool bitmap_filter_not_in = false;
 
-    RuntimeFilterParams params {
-            .filter_id = filter_id,
-            .filter_type = filter_type,
-            .column_return_type = column_return_type,
-            .null_aware = null_aware,
-            .max_in_num = max_in_num,
-            .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
-            .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
-            .bloom_filter_size = bloom_filter_size,
-            .build_bf_by_runtime_size = build_bf_by_runtime_size,
-            .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
-            .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
-            .bitmap_filter_not_in = bitmap_filter_not_in};
-
     std::shared_ptr<RuntimeFilterWrapper> wrapper;
+    // MinMax
     {
-        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
-        EXPECT_TRUE(wrapper->init(80).ok());
-        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 
type_limit<int>::min());
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), 
type_limit<int>::max());
-    }
-    {
-        // Insert
-        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
-        EXPECT_TRUE(wrapper->insert(col, 0).ok());
-        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + num_vals 
- 1);
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val);
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        {
+            wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(wrapper->init(80).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 
type_limit<int>::min());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), 
type_limit<int>::max());
+        }
+        {
+            // Insert
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+            EXPECT_TRUE(wrapper->insert(col, 0).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + 
num_vals - 1);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val);
+        }
+        {
+            PMergeFilterRequest valid_request;
+            valid_request.set_contain_null(false);
+            valid_request.set_filter_type(PFilterType::MINMAX_FILTER);
+            valid_request.set_filter_id(filter_id);
+            
EXPECT_TRUE(wrapper->to_protobuf(valid_request.mutable_minmax_filter()).ok());
+
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->assign(valid_request, nullptr).ok());
+
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(),
+                      *(int*)new_wrapper->minmax_func()->get_max());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(),
+                      *(int*)new_wrapper->minmax_func()->get_min());
+        }
+        {
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->init(12312).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            // Insert
+            std::vector<int> new_data_vector {-100, 100};
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+            EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+            // Merge
+            EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 100);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), -100);
+        }
+        EXPECT_EQ(wrapper->filter_id(), filter_id);
+        EXPECT_TRUE(wrapper->is_valid());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::MINMAX_FILTER);
+        EXPECT_EQ(wrapper->column_type(), column_return_type);
+        EXPECT_EQ(wrapper->contain_null(), false);
     }
+    // Min
     {
-        PMergeFilterRequest valid_request;
-        valid_request.set_contain_null(false);
-        valid_request.set_filter_type(PFilterType::MINMAX_FILTER);
-        valid_request.set_filter_id(filter_id);
-        wrapper->to_protobuf(valid_request.mutable_minmax_filter());
+        filter_type = RuntimeFilterType::MIN_FILTER;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        {
+            wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(wrapper->init(80).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 
type_limit<int>::min());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), 
type_limit<int>::max());
+        }
+        {
+            // Insert
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+            EXPECT_TRUE(wrapper->insert(col, 0).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + 
num_vals - 1);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val);
+        }
+        {
+            PMergeFilterRequest valid_request;
+            valid_request.set_contain_null(false);
+            valid_request.set_filter_type(PFilterType::MIN_FILTER);
+            valid_request.set_filter_id(filter_id);
+            
EXPECT_TRUE(wrapper->to_protobuf(valid_request.mutable_minmax_filter()).ok());
 
-        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
-        EXPECT_TRUE(new_wrapper->assign(valid_request, nullptr).ok());
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->assign(valid_request, nullptr).ok());
 
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(),
-                  *(int*)new_wrapper->minmax_func()->get_max());
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(),
-                  *(int*)new_wrapper->minmax_func()->get_min());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(),
+                      *(int*)new_wrapper->minmax_func()->get_min());
+        }
+        {
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->init(12312).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            // Insert
+            std::vector<int> new_data_vector {-100, 100};
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+            EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+            // Merge
+            EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 100);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), -100);
+        }
+        EXPECT_EQ(wrapper->filter_id(), filter_id);
+        EXPECT_TRUE(wrapper->is_valid());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::MIN_FILTER);
+        EXPECT_EQ(wrapper->column_type(), column_return_type);
+        EXPECT_EQ(wrapper->contain_null(), false);
     }
+    // Max
     {
-        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
-        EXPECT_TRUE(new_wrapper->init(12312).ok());
-        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
-        // Insert
-        std::vector<int> new_data_vector {-100, 100};
-        auto col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
-        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
-        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
-        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
-        // Merge
-        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 100);
-        EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), -100);
+        filter_type = RuntimeFilterType::MAX_FILTER;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        {
+            wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(wrapper->init(80).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 
type_limit<int>::min());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), 
type_limit<int>::max());
+        }
+        {
+            // Insert
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+            EXPECT_TRUE(wrapper->insert(col, 0).ok());
+            EXPECT_EQ(wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), min_val + 
num_vals - 1);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), min_val);
+        }
+        {
+            PMergeFilterRequest valid_request;
+            valid_request.set_contain_null(false);
+            valid_request.set_filter_type(PFilterType::MAX_FILTER);
+            valid_request.set_filter_id(filter_id);
+            
EXPECT_TRUE(wrapper->to_protobuf(valid_request.mutable_minmax_filter()).ok());
+
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->assign(valid_request, nullptr).ok());
+
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(),
+                      *(int*)new_wrapper->minmax_func()->get_max());
+        }
+        {
+            auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+            EXPECT_TRUE(new_wrapper->init(12312).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            // Insert
+            std::vector<int> new_data_vector {-100, 100};
+            auto col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+            EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+            EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+            new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+            // Merge
+            EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_max(), 100);
+            EXPECT_EQ(*(int*)wrapper->minmax_func()->get_min(), -100);
+        }
+        EXPECT_EQ(wrapper->filter_id(), filter_id);
+        EXPECT_TRUE(wrapper->is_valid());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::MAX_FILTER);
+        EXPECT_EQ(wrapper->column_type(), column_return_type);
+        EXPECT_EQ(wrapper->contain_null(), false);
     }
-    EXPECT_EQ(wrapper->filter_id(), filter_id);
-    EXPECT_TRUE(wrapper->is_valid());
-    EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::MINMAX_FILTER);
-    EXPECT_EQ(wrapper->column_type(), column_return_type);
-    EXPECT_EQ(wrapper->contain_null(), false);
 }
 
 TEST_F(RuntimeFilterWrapperTest, TestBitMap) {
@@ -642,6 +862,45 @@ TEST_F(RuntimeFilterWrapperTest, TestBitMap) {
         wrapper->_state = RuntimeFilterWrapper::State::READY;
         EXPECT_TRUE(new_wrapper->merge(wrapper.get()).ok());
     }
+    {
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(wrapper->init(80).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->bitmap_filter_func()->size(), 0);
+        // Insert nullable column
+        vectorized::DataTypePtr 
bitmap_data_type(std::make_shared<vectorized::DataTypeNullable>(
+                std::make_shared<vectorized::DataTypeBitMap>()));
+        auto bitmap_column = bitmap_data_type->create_column();
+        std::vector<BitmapValue>& container =
+                
((vectorized::ColumnBitmap*)((vectorized::ColumnNullable*)bitmap_column.get())
+                         ->get_nested_column_ptr()
+                         .get())
+                        ->get_data();
+        auto& null_map =
+                
((vectorized::ColumnUInt8*)((vectorized::ColumnNullable*)bitmap_column.get())
+                         ->get_null_map_column_ptr()
+                         .get())
+                        ->get_data();
+
+        for (int i = 0; i < 1024; ++i) {
+            BitmapValue bv;
+            bv.add(i);
+            container.push_back(bv);
+            null_map.push_back(i % 3 == 0);
+        }
+
+        EXPECT_TRUE(wrapper->insert(std::move(bitmap_column), 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+
+        // merge
+        std::shared_ptr<RuntimeFilterWrapper> new_wrapper;
+        new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_TRUE(new_wrapper->init(80).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(new_wrapper->bitmap_filter_func()->size(), 0);
+        wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_TRUE(new_wrapper->merge(wrapper.get()).ok());
+    }
     {
         PMergeFilterRequest valid_request;
         auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
@@ -652,6 +911,406 @@ TEST_F(RuntimeFilterWrapperTest, TestBitMap) {
     EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BITMAP_FILTER);
     EXPECT_EQ(wrapper->column_type(), column_return_type);
     EXPECT_EQ(wrapper->contain_null(), false);
+    EXPECT_FALSE(wrapper->_change_to_bloom_filter().ok());
+}
+
+TEST_F(RuntimeFilterWrapperTest, TestInOrBloom) {
+    std::vector<int> data_vector(10);
+    std::iota(data_vector.begin(), data_vector.end(), 0);
+    using DataType = vectorized::DataTypeInt32;
+    int32_t filter_id = 0;
+    auto runtime_size = 80;
+    RuntimeFilterType filter_type = RuntimeFilterType::IN_OR_BLOOM_FILTER;
+    bool null_aware = false;
+    PrimitiveType column_return_type = PrimitiveType::TYPE_INT;
+
+    int32_t max_in_num = 64;
+
+    int64_t runtime_bloom_filter_min_size = 64;
+    int64_t runtime_bloom_filter_max_size = 128;
+    bool build_bf_by_runtime_size = false;
+    int64_t bloom_filter_size = 64;
+    bool bloom_filter_size_calculated_by_ndv = false;
+    bool enable_fixed_len_to_uint32_v2 = true;
+
+    bool bitmap_filter_not_in = false;
+
+    std::shared_ptr<RuntimeFilterWrapper> wrapper;
+    {
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, 
bloom_filter_size);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        // Init (change to bloom filter)
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_alloced, 
bloom_filter_size);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+
+        col = vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        std::vector<uint8_t> res(10);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+    }
+    {
+        runtime_size = 32;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, 
bloom_filter_size);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        // Init (keep in filter)
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->hybrid_set()->size(), col->size());
+    }
+    {
+        // In + In -> In
+        max_in_num = 128;
+        runtime_size = 16;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(new_wrapper->init(runtime_size).ok());
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->hybrid_set()->size(), col->size());
+
+        std::vector<int> new_data_vector(10);
+        std::iota(new_data_vector.begin(), new_data_vector.end(), 10);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(new_wrapper->hybrid_set()->size(), col->size());
+        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        // Merge
+        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+        EXPECT_EQ(wrapper->hybrid_set()->size(), col->size() * 2);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+    }
+    {
+        // In + Bloom -> Bloom
+        max_in_num = 18;
+        runtime_size = 16;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+
+        max_in_num = 8;
+        runtime_size = 16;
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(new_wrapper->init(runtime_size).ok());
+        EXPECT_EQ(new_wrapper->get_real_type(), 
RuntimeFilterType::BLOOM_FILTER);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(wrapper->hybrid_set()->size(), data_vector.size());
+
+        std::vector<int> new_data_vector(10);
+        std::iota(new_data_vector.begin(), new_data_vector.end(), 10);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        std::vector<uint8_t> res(10);
+        new_wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        // Merge
+        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+
+        std::vector<int> final_data_vector(20);
+        std::iota(final_data_vector.begin(), final_data_vector.end(), 0);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(final_data_vector);
+        std::vector<uint8_t> final_res(20);
+        wrapper->bloom_filter_func()->find_fixed_len(col, final_res.data());
+        EXPECT_TRUE(std::all_of(final_res.begin(), final_res.end(),
+                                [](uint8_t i) -> bool { return i; }));
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+    }
+    {
+        // Bloom + In -> Bloom
+        max_in_num = 8;
+        runtime_size = 16;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+
+        max_in_num = 32;
+        runtime_size = 16;
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(new_wrapper->init(runtime_size).ok());
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        col = vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        std::vector<uint8_t> res(10);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+
+        std::vector<int> new_data_vector(10);
+        std::iota(new_data_vector.begin(), new_data_vector.end(), 10);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        EXPECT_EQ(new_wrapper->hybrid_set()->size(), col->size());
+        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        // Merge
+        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+
+        std::vector<int> final_data_vector(20);
+        std::iota(final_data_vector.begin(), final_data_vector.end(), 0);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(final_data_vector);
+        std::vector<uint8_t> final_res(20);
+        wrapper->bloom_filter_func()->find_fixed_len(col, final_res.data());
+        EXPECT_TRUE(std::all_of(final_res.begin(), final_res.end(),
+                                [](uint8_t i) -> bool { return i; }));
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+    }
+    {
+        // Bloom + Bloom -> Bloom
+        max_in_num = 8;
+        runtime_size = 16;
+        RuntimeFilterParams params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        wrapper = std::make_shared<RuntimeFilterWrapper>(&params);
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(wrapper->init(runtime_size).ok());
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+
+        RuntimeFilterParams new_params {
+                .filter_id = filter_id,
+                .filter_type = filter_type,
+                .column_return_type = column_return_type,
+                .null_aware = null_aware,
+                .max_in_num = max_in_num,
+                .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+                .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+                .bloom_filter_size = bloom_filter_size,
+                .build_bf_by_runtime_size = build_bf_by_runtime_size,
+                .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+                .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+                .bitmap_filter_not_in = bitmap_filter_not_in};
+        auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params);
+        EXPECT_EQ(new_wrapper->get_real_type(), RuntimeFilterType::IN_FILTER);
+        EXPECT_TRUE(new_wrapper->init(runtime_size).ok());
+        EXPECT_EQ(new_wrapper->get_real_type(), 
RuntimeFilterType::BLOOM_FILTER);
+        // Insert
+        auto col = 
vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        EXPECT_TRUE(wrapper->insert(col, 0).ok());
+        EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED);
+        col = vectorized::ColumnHelper::create_column<DataType>(data_vector);
+        std::vector<uint8_t> res(10);
+        wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+
+        std::vector<int> new_data_vector(10);
+        std::iota(new_data_vector.begin(), new_data_vector.end(), 10);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        EXPECT_TRUE(new_wrapper->insert(col, 0).ok());
+        EXPECT_EQ(new_wrapper->get_state(), 
RuntimeFilterWrapper::State::UNINITED);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(new_data_vector);
+        new_wrapper->bloom_filter_func()->find_fixed_len(col, res.data());
+        EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool 
{ return i; }));
+        new_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        // Merge
+        EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok());
+
+        std::vector<int> final_data_vector(20);
+        std::iota(final_data_vector.begin(), final_data_vector.end(), 0);
+        col = 
vectorized::ColumnHelper::create_column<DataType>(final_data_vector);
+        std::vector<uint8_t> final_res(20);
+        wrapper->bloom_filter_func()->find_fixed_len(col, final_res.data());
+        EXPECT_TRUE(std::all_of(final_res.begin(), final_res.end(),
+                                [](uint8_t i) -> bool { return i; }));
+        EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER);
+    }
+    EXPECT_EQ(wrapper->filter_id(), filter_id);
+    EXPECT_TRUE(wrapper->is_valid());
+    EXPECT_EQ(wrapper->column_type(), column_return_type);
+    EXPECT_EQ(wrapper->contain_null(), false);
+}
+
+TEST_F(RuntimeFilterWrapperTest, TestErrorPath) {
+    using DataType = vectorized::DataTypeInt32;
+    int32_t filter_id = 0;
+    RuntimeFilterType filter_type = RuntimeFilterType::UNKNOWN_FILTER;
+    bool null_aware = false;
+    PrimitiveType column_return_type = PrimitiveType::TYPE_INT;
+
+    int32_t max_in_num = 64;
+
+    int64_t runtime_bloom_filter_min_size = 64;
+    int64_t runtime_bloom_filter_max_size = 128;
+    bool build_bf_by_runtime_size = false;
+    int64_t bloom_filter_size = 64;
+    bool bloom_filter_size_calculated_by_ndv = false;
+    bool enable_fixed_len_to_uint32_v2 = true;
+
+    bool bitmap_filter_not_in = false;
+    RuntimeFilterParams params {
+            .filter_id = filter_id,
+            .filter_type = filter_type,
+            .column_return_type = column_return_type,
+            .null_aware = null_aware,
+            .max_in_num = max_in_num,
+            .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size,
+            .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size,
+            .bloom_filter_size = bloom_filter_size,
+            .build_bf_by_runtime_size = build_bf_by_runtime_size,
+            .bloom_filter_size_calculated_by_ndv = 
bloom_filter_size_calculated_by_ndv,
+            .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2,
+            .bitmap_filter_not_in = bitmap_filter_not_in};
+    std::shared_ptr<RuntimeFilterWrapper> wrapper = 
std::make_shared<RuntimeFilterWrapper>(&params);
+    auto col = vectorized::ColumnHelper::create_column<DataType>({0, 1, 2, 3, 
4, 5, 6, 7, 8, 9});
+    EXPECT_EQ(wrapper->insert(col, 0).code(), ErrorCode::INTERNAL_ERROR);
+    {
+        std::shared_ptr<RuntimeFilterWrapper> another_wrapper =
+                std::make_shared<RuntimeFilterWrapper>(&params);
+        another_wrapper->_state = RuntimeFilterWrapper::State::READY;
+        EXPECT_EQ(wrapper->merge(another_wrapper.get()).code(), 
ErrorCode::INTERNAL_ERROR);
+    }
+    PMergeFilterRequest valid_request;
+    EXPECT_FALSE(wrapper->to_protobuf(valid_request.mutable_in_filter()).ok());
+    
EXPECT_FALSE(wrapper->to_protobuf(valid_request.mutable_minmax_filter()).ok());
+    EXPECT_FALSE(wrapper->to_protobuf(valid_request.mutable_bloom_filter(), 
nullptr, nullptr).ok());
+    EXPECT_EQ(wrapper->minmax_func(), nullptr);
+    EXPECT_EQ(wrapper->bloom_filter_func(), nullptr);
+    EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr);
+    EXPECT_EQ(wrapper->hybrid_set(), nullptr);
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to