This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5640bdcf6d2a0534839c3cc0e77c980411764a07 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Mar 3 19:49:50 2025 +0800 [runtime filter](UT) test bloom filter (#48575) --- be/src/exprs/bloom_filter_func.h | 6 +- be/src/runtime_filter/runtime_filter_wrapper.cpp | 5 +- .../runtime_filter/runtime_filter_wrapper_test.cpp | 220 +++++++++++++++++++++ 3 files changed, 224 insertions(+), 7 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 774cf42e1e6..3df8be080ad 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -45,10 +45,8 @@ public: Status init_with_fixed_length(size_t runtime_size) { if (_build_bf_by_runtime_size) { // Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize - constexpr double fpp = 0.05; - constexpr double k = 8; // BUCKET_WORDS // m is the number of bits we would need to get the fpp specified - double m = -k * runtime_size / std::log(1 - std::pow(fpp, 1.0 / k)); + double m = -K * runtime_size / std::log(1 - std::pow(FPP, 1.0 / K)); // Handle case where ndv == 1 => ceil(log2(m/8)) < 0. int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2)))); @@ -143,6 +141,8 @@ public: bool is_parse_column) = 0; private: + static constexpr double FPP = 0.05; + static constexpr double K = 8; // BUCKET_WORDS void _limit_length() { if (_runtime_bloom_filter_min_size > 0) { _bloom_filter_length = std::max(_bloom_filter_length, _runtime_bloom_filter_min_size); diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp b/be/src/runtime_filter/runtime_filter_wrapper.cpp index a341021bc1c..09a03a10164 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.cpp +++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp @@ -32,10 +32,7 @@ RuntimeFilterWrapper::RuntimeFilterWrapper(const RuntimeFilterParams* params) } // Only use in nested loop join not need set null aware case RuntimeFilterType::MIN_FILTER: - case RuntimeFilterType::MAX_FILTER: { - _minmax_func.reset(create_minmax_filter(_column_return_type, params->null_aware)); - return; - } + case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { _minmax_func.reset(create_minmax_filter(_column_return_type, params->null_aware)); return; diff --git a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp index d15d78f70ad..a825221a49a 100644 --- a/be/test/runtime_filter/runtime_filter_wrapper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_wrapper_test.cpp @@ -20,6 +20,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "exprs/bloom_filter_func.h" #include "exprs/hybrid_set.h" #include "testutil/column_helper.h" #include "vec/data_types/data_type_number.h" @@ -72,6 +73,7 @@ TEST_F(RuntimeFilterWrapperTest, TestIn) { EXPECT_EQ(wrapper->bloom_filter_func(), nullptr); EXPECT_EQ(wrapper->bitmap_filter_func(), nullptr); EXPECT_NE(wrapper->hybrid_set(), nullptr); + EXPECT_FALSE(wrapper->build_bf_by_runtime_size()); { // Init EXPECT_TRUE(wrapper->init(2).ok()); @@ -191,6 +193,9 @@ TEST_F(RuntimeFilterWrapperTest, TestIn) { EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::READY); EXPECT_EQ(wrapper->hybrid_set()->size(), 2); } + EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::IN_FILTER); + EXPECT_EQ(wrapper->column_type(), column_return_type); + EXPECT_EQ(wrapper->contain_null(), false); } TEST_F(RuntimeFilterWrapperTest, TestInAssign) { @@ -268,4 +273,219 @@ TEST_F(RuntimeFilterWrapperTest, TestInAssign) { APPLY_FOR_PRIMITIVE_BASE_TYPE(TYPE_IPV6); } +TEST_F(RuntimeFilterWrapperTest, TestBloom) { + std::vector<int> data_vector(10); + std::iota(data_vector.begin(), data_vector.end(), 0); + using DataType = vectorized::DataTypeInt32; + int32_t filter_id = 0; + auto runtime_size = 80; + RuntimeFilterType filter_type = RuntimeFilterType::BLOOM_FILTER; + bool null_aware = false; + PrimitiveType column_return_type = PrimitiveType::TYPE_INT; + + int32_t max_in_num = 0; + + int64_t runtime_bloom_filter_min_size = 64; + int64_t runtime_bloom_filter_max_size = 128; + bool build_bf_by_runtime_size = false; + int64_t bloom_filter_size = 0; + bool bloom_filter_size_calculated_by_ndv = true; + bool enable_fixed_len_to_uint32_v2 = true; + + bool bitmap_filter_not_in = false; + + std::shared_ptr<RuntimeFilterWrapper> wrapper; + { + bloom_filter_size = 256; + RuntimeFilterParams params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_FALSE(wrapper->build_bf_by_runtime_size()); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + runtime_bloom_filter_max_size); + } + { + bloom_filter_size = 32; + RuntimeFilterParams params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + runtime_bloom_filter_min_size); + // Init (set BF size by estimated size from FE) + EXPECT_TRUE(wrapper->init(80).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + runtime_bloom_filter_min_size); + } + { + build_bf_by_runtime_size = true; + bloom_filter_size = 32; + bloom_filter_size_calculated_by_ndv = false; + RuntimeFilterParams params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_TRUE(wrapper->build_bf_by_runtime_size()); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + runtime_bloom_filter_min_size); + // Init (set BF size by exact size from BE) + double m = -BloomFilterFuncBase::K * runtime_size / + std::log(1 - std::pow(BloomFilterFuncBase::FPP, 1.0 / BloomFilterFuncBase::K)); + int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2)))); + auto be_calculate_size = (((int64_t)1) << log_filter_size); + EXPECT_TRUE(wrapper->init(runtime_size).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, be_calculate_size); + } + { + build_bf_by_runtime_size = true; + bloom_filter_size = 32; + bloom_filter_size_calculated_by_ndv = true; + RuntimeFilterParams params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + wrapper = std::make_shared<RuntimeFilterWrapper>(¶ms); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + runtime_bloom_filter_min_size); + // Init (set BF size by exact size from BE) + auto runtime_size = 80; + double m = -BloomFilterFuncBase::K * runtime_size / + std::log(1 - std::pow(BloomFilterFuncBase::FPP, 1.0 / BloomFilterFuncBase::K)); + int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2)))); + auto be_calculate_size = (((int64_t)1) << log_filter_size); + // Init (set BF size by min size of exact size from BE and the estimated size from FE) + EXPECT_TRUE(wrapper->init(runtime_size).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + EXPECT_EQ(wrapper->bloom_filter_func()->_bloom_filter_length, + std::min(be_calculate_size, runtime_bloom_filter_min_size)); + } + { + // Insert + auto col = vectorized::ColumnHelper::create_column<DataType>(data_vector); + EXPECT_TRUE(wrapper->insert(col, 0).ok()); + EXPECT_EQ(wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + + col = vectorized::ColumnHelper::create_column<DataType>(data_vector); + std::vector<uint8_t> res(10); + wrapper->bloom_filter_func()->find_fixed_len(col, res.data()); + EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool { return i; })); + } + { + PMergeFilterRequest valid_request; + valid_request.set_contain_null(false); + valid_request.set_filter_type(PFilterType::BLOOM_FILTER); + valid_request.set_filter_id(filter_id); + char* data = nullptr; + int len = 0; + wrapper->to_protobuf(valid_request.mutable_bloom_filter(), &data, &len); + + const auto str = std::string(data, len); + butil::IOBuf io_buf; + io_buf.operator=(str); + butil::IOBufAsZeroCopyInputStream stream(io_buf); + RuntimeFilterParams new_params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params); + EXPECT_TRUE(new_wrapper->assign(valid_request, &stream).ok()); + + auto col = vectorized::ColumnHelper::create_column<DataType>(data_vector); + std::vector<uint8_t> res(10); + wrapper->bloom_filter_func()->find_fixed_len(col, res.data()); + EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool { return i; })); + } + { + RuntimeFilterParams new_params { + .filter_id = filter_id, + .filter_type = filter_type, + .column_return_type = column_return_type, + .null_aware = null_aware, + .max_in_num = max_in_num, + .runtime_bloom_filter_min_size = runtime_bloom_filter_min_size, + .runtime_bloom_filter_max_size = runtime_bloom_filter_max_size, + .bloom_filter_size = bloom_filter_size, + .build_bf_by_runtime_size = build_bf_by_runtime_size, + .bloom_filter_size_calculated_by_ndv = bloom_filter_size_calculated_by_ndv, + .enable_fixed_len_to_uint32_v2 = enable_fixed_len_to_uint32_v2, + .bitmap_filter_not_in = bitmap_filter_not_in}; + auto new_wrapper = std::make_shared<RuntimeFilterWrapper>(&new_params); + EXPECT_TRUE(new_wrapper->init(runtime_size).ok()); + EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + EXPECT_EQ(new_wrapper->bloom_filter_func()->_bloom_filter_length, + wrapper->bloom_filter_func()->_bloom_filter_length); + // Insert + std::vector<int> new_data_vector(10); + std::iota(new_data_vector.begin(), new_data_vector.end(), 10); + auto col = vectorized::ColumnHelper::create_column<DataType>(new_data_vector); + EXPECT_TRUE(new_wrapper->insert(col, 0).ok()); + EXPECT_EQ(new_wrapper->get_state(), RuntimeFilterWrapper::State::UNINITED); + new_wrapper->_state = RuntimeFilterWrapper::State::READY; + // Merge + std::vector<int> res_data_vector(20); + std::iota(res_data_vector.begin(), res_data_vector.end(), 0); + EXPECT_TRUE(wrapper->merge(new_wrapper.get()).ok()); + col = vectorized::ColumnHelper::create_column<DataType>(res_data_vector); + std::vector<uint8_t> res(20); + wrapper->bloom_filter_func()->find_fixed_len(col, res.data()); + EXPECT_TRUE(std::all_of(res.begin(), res.end(), [](uint8_t i) -> bool { return i; })); + } + EXPECT_EQ(wrapper->filter_id(), filter_id); + EXPECT_TRUE(wrapper->is_valid()); + EXPECT_EQ(wrapper->get_real_type(), RuntimeFilterType::BLOOM_FILTER); + EXPECT_EQ(wrapper->column_type(), column_return_type); + EXPECT_EQ(wrapper->contain_null(), false); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org