This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9d710a686f79de6956de3024f3b2a1c4cec17f94 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Fri Mar 7 10:18:56 2025 +0800 [test](ut) add ut for bloom filter and hybridset (#48698) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/exprs/bloom_filter_func.h | 14 +- be/src/exprs/minmax_predicate.h | 16 +- .../runtime_filter_producer_helper.h | 12 +- be/test/CMakeLists.txt | 1 - be/test/exprs/bloom_filter_func_test.cpp | 505 +++++++++++++++++++++ be/test/exprs/hybrid_set_test.cpp | 301 +++++++++++- be/test/exprs/minmax_predicate_test.cpp | 107 +++++ .../operator/partitioned_hash_join_test_helper.h | 15 + 8 files changed, 948 insertions(+), 23 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 3df8be080ad..df8c00b8060 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -78,16 +78,12 @@ public: if (other->_bloom_filter == nullptr) { return Status::InternalError("other->_bloom_filter is nullptr"); } - // If `_inited` is false, there is no memory allocated in bloom filter and this is the first - // call for `merge` function. So we just reuse this bloom filter, and we don't need to - // allocate memory again. - if (!_bloom_filter_alloced) { - if (_bloom_filter != nullptr) { - return Status::InternalError("_bloom_filter must is nullptr"); - } - light_copy(other); - return Status::OK(); + + if (_bloom_filter_alloced == 0) { + return Status::InternalError("bloom filter is not initialized"); } + DCHECK(_bloom_filter); + if (_bloom_filter_alloced != other->_bloom_filter_alloced) { return Status::InternalError( "bloom filter size not the same: already allocated bytes {}, expected " diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index 832411fd77c..0e62d86567a 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -45,6 +45,7 @@ public: template <class T, bool NeedMax = true, bool NeedMin = true> class MinMaxNumFunc : public MinMaxFuncBase { public: + static constexpr bool IsStringValue = std::is_same_v<T, std::string>; MinMaxNumFunc(bool null_aware) : MinMaxFuncBase(null_aware) {} ~MinMaxNumFunc() override = default; @@ -67,7 +68,12 @@ public: Status merge(MinMaxFuncBase* minmax_func) override { auto* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); if constexpr (NeedMin) { - if (other_minmax->_min < _min) { + if constexpr (IsStringValue) { + if (other_minmax->_min < _min || !_min_value_set) { + _min = other_minmax->_min; + _min_value_set = true; + } + } else if (other_minmax->_min < _min) { _min = other_minmax->_min; } } @@ -86,6 +92,9 @@ public: void* get_min() override { return &_min; } Status assign(void* min_data, void* max_data) override { + if constexpr (IsStringValue) { + _min_value_set = true; + } _min = *(T*)min_data; _max = *(T*)max_data; return Status::OK(); @@ -108,8 +117,9 @@ private: for (size_t i = start; i < size; i++) { if (nullmap == nullptr || !nullmap[i]) { if constexpr (NeedMin) { - if (column_string.get_data_at(i) < StringRef(_min)) { + if (column_string.get_data_at(i) < StringRef(_min) || !_min_value_set) { _min = column_string.get_data_at(i).to_string(); + _min_value_set = true; } } if constexpr (NeedMax) { @@ -172,6 +182,8 @@ private: T _max = type_limit<T>::min(); T _min = type_limit<T>::max(); + + bool _min_value_set = false; }; template <class T> diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h b/be/src/runtime_filter/runtime_filter_producer_helper.h index dfd6b6bccf9..cd0f3c7536c 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.h +++ b/be/src/runtime_filter/runtime_filter_producer_helper.h @@ -17,6 +17,7 @@ #pragma once +#include "common/be_mock_util.h" #include "common/status.h" #include "runtime/runtime_state.h" #include "runtime_filter/runtime_filter.h" @@ -45,16 +46,21 @@ public: _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(_profile, "BuildTime", 1); } +#ifdef BE_TEST + RuntimeFilterProducerHelper() : _should_build_hash_table(true), _is_broadcast_join(false) {} +#endif + // create and register runtime filters producers Status init(RuntimeState* state, const vectorized::VExprContextSPtrs& build_expr_ctxs, const std::vector<TRuntimeFilterDesc>& runtime_filter_descs); // send local size to remote to sync global rf size if needed - Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, - std::shared_ptr<pipeline::CountedFinishDependency> dependency); + MOCK_FUNCTION Status + send_filter_size(RuntimeState* state, uint64_t hash_table_size, + std::shared_ptr<pipeline::CountedFinishDependency> dependency); // skip all runtime filter process, send size and rf to remote imeediately, mainly used to make join spill instance do not block other instance - Status skip_process(RuntimeState* state); + MOCK_FUNCTION Status skip_process(RuntimeState* state); // build rf's predicate and publish rf Status process(RuntimeState* state, const vectorized::Block* block, diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 4968a64d078..a6dd65f043d 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -49,7 +49,6 @@ list(REMOVE_ITEM UT_FILES ${CMAKE_CURRENT_SOURCE_DIR}/runtime/decimal_value_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/util/decompress_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/util/url_coding_test.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/exprs/hybrid_set_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/io/fs/remote_file_system_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/olap/remote_rowset_gc_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/runtime/jsonb_value_test.cpp diff --git a/be/test/exprs/bloom_filter_func_test.cpp b/be/test/exprs/bloom_filter_func_test.cpp new file mode 100644 index 00000000000..5562132cc58 --- /dev/null +++ b/be/test/exprs/bloom_filter_func_test.cpp @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/bloom_filter_func.h" + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include <cstdint> +#include <memory> +#include <numeric> +#include <string> +#include <vector> + +#include "common/object_pool.h" +#include "common/status.h" +#include "exprs/create_predicate_function.h" +#include "gtest/gtest.h" +#include "gtest/gtest_pred_impl.h" +#include "runtime/define_primitive_type.h" +#include "runtime/primitive_type.h" +#include "testutil/column_helper.h" +#include "vec/columns/column_decimal.h" + +namespace doris { +class BloomFilterFuncTest : public testing::Test { +protected: + BloomFilterFuncTest() = default; + ~BloomFilterFuncTest() override = default; + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(BloomFilterFuncTest, Init) { + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func(false); + try { + bloom_filter_func.contain_null(); + ASSERT_TRUE(false) << "No exception thrown"; + } catch (...) { + } + + const size_t runtime_length = 1024; + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + + try { + bloom_filter_func.contain_null(); + ASSERT_TRUE(false) << "No exception thrown"; + } catch (...) { + } + + auto st = bloom_filter_func.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func2(true); + params.null_aware = false; + params.build_bf_by_runtime_size = true; + + bloom_filter_func2.init_params(¶ms); + st = bloom_filter_func2.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + ASSERT_EQ(bloom_filter_func2._bloom_filter_length, runtime_length); + + bloom_filter_func.light_copy(&bloom_filter_func2); + bloom_filter_func2.light_copy(&bloom_filter_func); +} + +TEST_F(BloomFilterFuncTest, InsertSet) { + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func(false); + const size_t runtime_length = 1024; + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto set = std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(bloom_filter_func._null_aware); + int32_t a = 0; + set->insert(&a); + a = 1; + set->insert(&a); + a = 2; + set->insert(&a); + a = 3; + set->insert(&a); + a = 4; + set->insert(&a); + a = 4; + set->insert(&a); + + bloom_filter_func.insert_set(set); + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>({1, 2, 3, 4}); + + vectorized::PODArray<uint8_t> result(column->size()); + bloom_filter_func.find_fixed_len(column, result.data()); + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func2(false); + params.enable_fixed_len_to_uint32_v2 = true; + bloom_filter_func2.init_params(¶ms); + st = bloom_filter_func2.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + bloom_filter_func2.insert_set(set); + + bloom_filter_func2.find_fixed_len(column, result.data()); + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } +} + +TEST_F(BloomFilterFuncTest, InsertFixedLen) { + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func(true); + const size_t runtime_length = 1024; + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>({1, 2, 3, 4}); + auto nullmap_column = vectorized::ColumnUInt8::create(4, 0); + nullmap_column->get_data()[1] = 1; + nullmap_column->get_data()[3] = 1; + auto nullable_column = + vectorized::ColumnNullable::create(std::move(column), std::move(nullmap_column)); + + bloom_filter_func.insert_fixed_len(nullable_column, 0); + + ASSERT_TRUE(nullable_column->has_null()); + bloom_filter_func.set_contain_null(true); + ASSERT_TRUE(bloom_filter_func.contain_null()); + + BloomFilterFunc<PrimitiveType::TYPE_STRING> bloom_filter_func2(true); + params.column_return_type = PrimitiveType::TYPE_STRING; + bloom_filter_func2.init_params(¶ms); + st = bloom_filter_func2.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto column_string = vectorized::ColumnHelper::create_column<vectorized::DataTypeString>( + {"aa", "bb", "cc", "dd"}); + nullmap_column = vectorized::ColumnUInt8::create(4, 0); + nullmap_column->get_data()[1] = 1; + nullmap_column->get_data()[3] = 1; + nullable_column = + vectorized::ColumnNullable::create(column_string->clone(), nullmap_column->clone()); + + bloom_filter_func2.insert_fixed_len(nullable_column, 0); + + ASSERT_TRUE(nullable_column->has_null()); + + ASSERT_TRUE(bloom_filter_func2.contain_null()); + + vectorized::PODArray<uint16_t> offsets(4); + std::iota(offsets.begin(), offsets.end(), 0); + + std::vector<StringRef> strings(4); + strings[0] = StringRef("aa"); + strings[1] = StringRef("bb"); + strings[2] = StringRef("cc"); + strings[3] = StringRef("dd"); + + auto find_count = bloom_filter_func2.find_fixed_len_olap_engine( + reinterpret_cast<const char*>(strings.data()), nullmap_column->get_data().data(), + offsets.data(), 4, false); + + ASSERT_EQ(find_count, 4); + + nullmap_column->get_data()[1] = 0; + nullmap_column->get_data()[3] = 0; + find_count = bloom_filter_func2.find_fixed_len_olap_engine( + reinterpret_cast<const char*>(strings.data()), nullmap_column->get_data().data(), + offsets.data(), 4, false); + + ASSERT_EQ(find_count, 2); + ASSERT_EQ(offsets[0], 0); + ASSERT_EQ(offsets[1], 2); +} + +TEST_F(BloomFilterFuncTest, Merge) { + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func(false); + const size_t runtime_length = 1024; + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto set = std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(bloom_filter_func._null_aware); + int32_t a = 0; + set->insert(&a); + a = 1; + set->insert(&a); + a = 2; + set->insert(&a); + a = 3; + set->insert(&a); + a = 4; + set->insert(&a); + a = 4; + set->insert(&a); + + bloom_filter_func.insert_set(set); + + auto set2 = std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(bloom_filter_func._null_aware); + + a = 7; + set2->insert(&a); + a = 8; + set2->insert(&a); + + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func2(false); + + st = bloom_filter_func.merge(nullptr); + ASSERT_FALSE(st); + + st = bloom_filter_func.merge(&bloom_filter_func2); + ASSERT_FALSE(st); + + // `bloom_filter_func2` is not initialized, merge should fail + st = bloom_filter_func2.merge(&bloom_filter_func); + ASSERT_FALSE(st); + + bloom_filter_func2.init_params(¶ms); + st = bloom_filter_func2.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + bloom_filter_func2.insert_set(set2); + + st = bloom_filter_func2.merge(&bloom_filter_func); + ASSERT_TRUE(st.ok()) << "Failed to merge bloom filter: " << st.to_string(); + + st = bloom_filter_func.merge(&bloom_filter_func2); + ASSERT_TRUE(st.ok()) << "Failed to merge bloom filter: " << st.to_string(); + + auto column = + vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>({1, 2, 3, 4, 7, 8}); + vectorized::PODArray<uint8_t> result(column->size()); + bloom_filter_func2.find_fixed_len(column, result.data()); + + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + bloom_filter_func.find_fixed_len(column, result.data()); + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func3(false); + params.bloom_filter_size = 512; + + bloom_filter_func3.init_params(¶ms); + st = bloom_filter_func3.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + st = bloom_filter_func3.merge(&bloom_filter_func); + ASSERT_FALSE(st); +} + +TEST_F(BloomFilterFuncTest, MergeLargeData) { + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func(false); + const size_t runtime_length = 1024; + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto set = std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(bloom_filter_func._null_aware); + + const int32_t count = 1024 * 1024; + std::vector<int32_t> data1(count); + for (int32_t i = 0; i != count; ++i) { + set->insert(&i); + data1[i] = i; + } + + bloom_filter_func.insert_set(set); + + auto set2 = std::make_shared<HybridSet<PrimitiveType::TYPE_INT>>(bloom_filter_func._null_aware); + + const int32_t count2 = 1024 * 512; + std::vector<int32_t> data2(count2); + for (int32_t i = 0; i != 1024 * 512; ++i) { + auto a = i % 10 * i + i; + set2->insert(&a); + data2[i] = a; + } + + BloomFilterFunc<PrimitiveType::TYPE_INT> bloom_filter_func2(false); + + st = bloom_filter_func.merge(nullptr); + ASSERT_FALSE(st); + + st = bloom_filter_func.merge(&bloom_filter_func2); + ASSERT_FALSE(st); + + // `bloom_filter_func2` is not initialized, merge should fail + st = bloom_filter_func2.merge(&bloom_filter_func); + ASSERT_FALSE(st); + + bloom_filter_func2.init_params(¶ms); + st = bloom_filter_func2.init_with_fixed_length(runtime_length); + ASSERT_TRUE(st.ok()) << "Failed to init bloom filter with fixed length: " << st.to_string(); + bloom_filter_func2.insert_set(set2); + + st = bloom_filter_func2.merge(&bloom_filter_func); + ASSERT_TRUE(st.ok()) << "Failed to merge bloom filter: " << st.to_string(); + + st = bloom_filter_func.merge(&bloom_filter_func2); + ASSERT_TRUE(st.ok()) << "Failed to merge bloom filter: " << st.to_string(); + + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>(data1); + auto column2 = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>(data2); + vectorized::PODArray<uint8_t> result(column->size()); + bloom_filter_func2.find_fixed_len(column, result.data()); + + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + bloom_filter_func.find_fixed_len(column, result.data()); + for (size_t i = 0; i < column->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + result.resize(column2->size()); + bloom_filter_func2.find_fixed_len(column2, result.data()); + + for (size_t i = 0; i < column2->size(); ++i) { + ASSERT_TRUE(result[i]); + } + + bloom_filter_func.find_fixed_len(column2, result.data()); + for (size_t i = 0; i < column2->size(); ++i) { + ASSERT_TRUE(result[i]); + } +} + +TEST_F(BloomFilterFuncTest, FindDictOlapEngine) { + const size_t count = 4096; + + std::vector<StringRef> dicts = {StringRef("aa"), StringRef("bb"), StringRef("cc"), + StringRef("dd"), StringRef("aab"), StringRef("bbc"), + StringRef("ccd"), StringRef("dde")}; + auto column = vectorized::ColumnDictI32::create(); + column->reserve(count); + std::vector<int32_t> data(count); + for (size_t i = 0; i != count; ++i) { + data[i] = i % dicts.size(); + } + + column->insert_many_dict_data(data.data(), 0, dicts.data(), count, dicts.size()); + column->initialize_hash_values_for_runtime_filter(); + + BloomFilterFunc<PrimitiveType::TYPE_STRING> bloom_filter_func(false); + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(0); + ASSERT_TRUE(st) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto string_column = vectorized::ColumnString::create(); + for (auto& dict : dicts) { + string_column->insert_data(dict.data, dict.size); + } + + bloom_filter_func.insert_fixed_len(std::move(string_column), 0); + + vectorized::PODArray<uint16_t> offsets(count); + std::iota(offsets.begin(), offsets.end(), 0); + + auto find_count = bloom_filter_func.find_dict_olap_engine<false>(column.get(), nullptr, + offsets.data(), count); + ASSERT_EQ(find_count, count); + + vectorized::PODArray<uint8_t> nullmap; + uint8_t flag = 0; + nullmap.assign(count, flag); + find_count = bloom_filter_func.find_dict_olap_engine<true>(column.get(), nullmap.data(), + offsets.data(), count); + ASSERT_EQ(find_count, count); +} + +TEST_F(BloomFilterFuncTest, FindFixedLenOlapEngine) { + const size_t count = 4096; + + BloomFilterFunc<PrimitiveType::TYPE_DECIMAL256> bloom_filter_func(true); + RuntimeFilterParams params {1, + RuntimeFilterType::BLOOM_FILTER, + PrimitiveType::TYPE_INT, + false, + 0, + 0, + 0, + 256, + 0, + 0, + false, + false}; + bloom_filter_func.init_params(¶ms); + auto st = bloom_filter_func.init_with_fixed_length(0); + ASSERT_TRUE(st) << "Failed to init bloom filter with fixed length: " << st.to_string(); + + auto decimal_column = vectorized::ColumnDecimal256::create(0, 8); + auto decimal_column2 = vectorized::ColumnDecimal256::create(0, 8); + decimal_column->reserve(count); + decimal_column2->reserve(count); + for (size_t i = 0; i != count; ++i) { + vectorized::Decimal256 value = vectorized::Decimal256::from_int_frac(wide::Int256(i), 4, 8); + decimal_column->insert_data(reinterpret_cast<const char*>(&value), sizeof(value)); + decimal_column2->insert_data(reinterpret_cast<const char*>(&value), sizeof(value)); + } + bloom_filter_func.insert_fixed_len(std::move(decimal_column), 0); + + vectorized::PODArray<uint16_t> offsets(count); + std::iota(offsets.begin(), offsets.end(), 0); + + vectorized::PODArray<uint8_t> nullmap; + uint8_t flag = 0; + nullmap.assign(count, flag); + auto find_count = bloom_filter_func.find_fixed_len_olap_engine( + reinterpret_cast<const char*>(decimal_column2->get_data().data()), nullmap.data(), + offsets.data(), count, true); + ASSERT_EQ(find_count, count); +} + +} // namespace doris \ No newline at end of file diff --git a/be/test/exprs/hybrid_set_test.cpp b/be/test/exprs/hybrid_set_test.cpp index eabd3f0b9d6..f7f32d7ac37 100644 --- a/be/test/exprs/hybrid_set_test.cpp +++ b/be/test/exprs/hybrid_set_test.cpp @@ -19,10 +19,13 @@ #include <gtest/gtest.h> +#include <memory> #include <string> #include "common/config.h" #include "exprs/create_predicate_function.h" +#include "gtest/internal/gtest-internal.h" +#include "testutil/column_helper.h" namespace doris { @@ -35,7 +38,7 @@ protected: }; TEST_F(HybridSetTest, bool) { - HybridSetBase* set = create_set(TYPE_BOOLEAN); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_BOOLEAN, false)); bool a = true; set->insert(&a); a = false; @@ -60,7 +63,7 @@ TEST_F(HybridSetTest, bool) { } TEST_F(HybridSetTest, tinyint) { - HybridSetBase* set = create_set(TYPE_TINYINT); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_TINYINT, false)); int8_t a = 0; set->insert(&a); a = 1; @@ -97,7 +100,7 @@ TEST_F(HybridSetTest, tinyint) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, smallint) { - HybridSetBase* set = create_set(TYPE_SMALLINT); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_SMALLINT, false)); int16_t a = 0; set->insert(&a); a = 1; @@ -133,7 +136,7 @@ TEST_F(HybridSetTest, smallint) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, int) { - HybridSetBase* set = create_set(TYPE_INT); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_INT, false)); int32_t a = 0; set->insert(&a); a = 1; @@ -169,7 +172,7 @@ TEST_F(HybridSetTest, int) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, bigint) { - HybridSetBase* set = create_set(TYPE_BIGINT); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_BIGINT, false)); int64_t a = 0; set->insert(&a); a = 1; @@ -205,7 +208,7 @@ TEST_F(HybridSetTest, bigint) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, float) { - HybridSetBase* set = create_set(TYPE_FLOAT); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_FLOAT, false)); float a = 0; set->insert(&a); a = 1.1; @@ -241,7 +244,7 @@ TEST_F(HybridSetTest, float) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, double) { - HybridSetBase* set = create_set(TYPE_DOUBLE); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_DOUBLE, false)); double a = 0; set->insert(&a); a = 1.1; @@ -277,7 +280,7 @@ TEST_F(HybridSetTest, double) { EXPECT_FALSE(set->find(&a)); } TEST_F(HybridSetTest, string) { - HybridSetBase* set = create_set(TYPE_VARCHAR); + std::unique_ptr<HybridSetBase> set(create_set(PrimitiveType::TYPE_VARCHAR, false)); StringRef a; char buf[100]; @@ -327,4 +330,286 @@ TEST_F(HybridSetTest, string) { EXPECT_FALSE(set->find(&b)); } +#define TEST_FIXED_CONTAINER(N) \ + std::unique_ptr<HybridSetBase> set(create_set<N>(PrimitiveType::TYPE_INT, false)); \ + \ + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>( \ + {1, 2, 3, 4, 5, 6, 7, 8}); \ + auto result_column = vectorized::ColumnUInt8::create(N, 0); \ + try { \ + set->find_batch(*column, N, result_column->get_data()); \ + ASSERT_TRUE(false) << "should not be here"; \ + } catch (...) { \ + } \ + \ + for (size_t i = 0; i != N; ++i) { \ + set->insert(&i); \ + } \ + \ + for (size_t i = 0; i != N; ++i) { \ + ASSERT_TRUE(set->find(&i)); \ + } \ + \ + for (size_t i = N; i != 1024; ++i) { \ + ASSERT_FALSE(set->find(&i)); \ + } \ + \ + std::unique_ptr<HybridSetBase> set2(create_set<N>(PrimitiveType::TYPE_INT, false)); \ + set2->insert(set.get()); \ + \ + for (size_t i = 0; i != N; ++i) { \ + ASSERT_TRUE(set2->find(&i)); \ + } \ + \ + for (size_t i = N; i != 1024; ++i) { \ + ASSERT_FALSE(set2->find(&i)); \ + } \ + \ + auto it = set->begin(); \ + while (it->has_next()) { \ + auto value = *(int*)it->get_value(); \ + ASSERT_TRUE(set2->find(&value)) << "cannot find: " << value; \ + it->next(); \ + } \ + PInFilter in_filter; \ + set->to_pb(&in_filter); \ + set->clear(); \ + ASSERT_EQ(set->size(), 0); + +TEST_F(HybridSetTest, FixedContainer) { + { TEST_FIXED_CONTAINER(1); } + { TEST_FIXED_CONTAINER(2); } + { TEST_FIXED_CONTAINER(3); } + { TEST_FIXED_CONTAINER(4); } + { TEST_FIXED_CONTAINER(5); } + { TEST_FIXED_CONTAINER(6); } + { TEST_FIXED_CONTAINER(7); } + { TEST_FIXED_CONTAINER(8); } + + std::unique_ptr<HybridSetBase> set(create_set<8>(PrimitiveType::TYPE_INT, false)); + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 5, 6, 7, 8}); +} + +TEST_F(HybridSetTest, FindBatch) { + std::unique_ptr<HybridSetBase> string_set(create_set(PrimitiveType::TYPE_VARCHAR, true)); + auto string_column = vectorized::ColumnHelper::create_column<vectorized::DataTypeString>( + {"ab", "cd", "ef", "gh", "ij", "kl", "mn", "op"}); + auto nullmap_column = vectorized::ColumnUInt8::create(8, 0); + + auto nullable_column = + vectorized::ColumnNullable::create(string_column->clone(), nullmap_column->clone()); + + string_set->insert_fixed_len(nullable_column->clone(), 0); + ASSERT_EQ(string_set->size(), nullable_column->size()); + + nullmap_column->get_data()[1] = 1; + nullmap_column->get_data()[3] = 1; + nullmap_column->get_data()[6] = 1; + auto nullable_column2 = + vectorized::ColumnNullable::create(string_column->clone(), nullmap_column->clone()); + + std::unique_ptr<HybridSetBase> string_set2(create_set(PrimitiveType::TYPE_VARCHAR, true)); + string_set2->insert_fixed_len(nullable_column2->clone(), 0); + ASSERT_EQ(string_set2->size(), nullable_column2->size() - 3); + ASSERT_TRUE(string_set2->contain_null()); + + auto result_column = vectorized::ColumnUInt8::create(nullable_column2->size(), 0); + string_set->find_batch(*string_column, string_column->size(), result_column->get_data()); + + ASSERT_EQ(result_column->get_data()[0], 1); + ASSERT_EQ(result_column->get_data()[1], 1); + ASSERT_EQ(result_column->get_data()[2], 1); + ASSERT_EQ(result_column->get_data()[3], 1); + ASSERT_EQ(result_column->get_data()[4], 1); + ASSERT_EQ(result_column->get_data()[5], 1); + ASSERT_EQ(result_column->get_data()[6], 1); + ASSERT_EQ(result_column->get_data()[7], 1); + + string_set->find_batch_negative(*string_column, string_column->size(), + result_column->get_data()); + ASSERT_EQ(result_column->get_data()[0], 0); + ASSERT_EQ(result_column->get_data()[1], 0); + ASSERT_EQ(result_column->get_data()[2], 0); + ASSERT_EQ(result_column->get_data()[3], 0); + ASSERT_EQ(result_column->get_data()[4], 0); + ASSERT_EQ(result_column->get_data()[5], 0); + ASSERT_EQ(result_column->get_data()[6], 0); + ASSERT_EQ(result_column->get_data()[7], 0); + + // Only bloom fitler need to handle nullaware(VRuntimeFilterWrapper::execute), + // So HybridSet will return false when find null value. + string_set2->find_batch_nullable(*string_column, string_column->size(), + nullmap_column->get_data(), result_column->get_data()); + ASSERT_EQ(result_column->get_data()[0], 1); + // null value always return false, no metter nullaware or not. + ASSERT_EQ(result_column->get_data()[1], 0); + ASSERT_EQ(result_column->get_data()[2], 1); + ASSERT_EQ(result_column->get_data()[3], 0); + ASSERT_EQ(result_column->get_data()[4], 1); + ASSERT_EQ(result_column->get_data()[5], 1); + ASSERT_EQ(result_column->get_data()[6], 0); + ASSERT_EQ(result_column->get_data()[7], 1); + + string_set2->find_batch_nullable_negative(*string_column, string_column->size(), + nullmap_column->get_data(), + result_column->get_data()); + ASSERT_EQ(result_column->get_data()[0], 0); + ASSERT_EQ(result_column->get_data()[1], 1); + ASSERT_EQ(result_column->get_data()[2], 0); + ASSERT_EQ(result_column->get_data()[3], 1); + ASSERT_EQ(result_column->get_data()[4], 0); + ASSERT_EQ(result_column->get_data()[5], 0); + ASSERT_EQ(result_column->get_data()[6], 1); + ASSERT_EQ(result_column->get_data()[7], 0); + + PInFilter in_filter; + string_set2->to_pb(&in_filter); + string_set2->clear(); +} + +TEST_F(HybridSetTest, StringValueSet) { + auto test_string_value_set = [](size_t n) { + std::unique_ptr<HybridSetBase> string_value_set(create_string_value_set(n, true)); + + string_value_set->insert((const void*)(nullptr)); + ASSERT_TRUE(string_value_set->contain_null()); + + StringRef refs[] = {StringRef("ab"), StringRef("cd"), StringRef("ef"), StringRef("gh"), + StringRef("ij"), StringRef("kl"), StringRef("mn"), StringRef("op"), + StringRef("qr"), StringRef("st"), StringRef("uv"), StringRef("wx")}; + for (size_t i = 0; i != n; ++i) { + string_value_set->insert((const void*)&refs[i]); + } + + for (size_t i = 0; i != 12; ++i) { + ASSERT_EQ(string_value_set->find((const void*)&refs[i]), i < n); + } + + StringRef tmp("abc"); + ASSERT_FALSE(string_value_set->find((const void*)&tmp)); + + string_value_set->clear(); + + const char* strings[] = {"ab", "cd", "ef", "gh", "ij", "kl", + "mn", "op", "qr", "st", "uv", "wx"}; + for (size_t i = 0; i != n; ++i) { + string_value_set->insert((void*)strings[i], strlen(strings[i])); + } + + for (size_t i = 0; i != 12; ++i) { + ASSERT_EQ(string_value_set->find((const void*)&refs[i]), i < n); + ASSERT_EQ(string_value_set->find((const void*)strings[i], strlen(strings[i])), i < n); + } + }; + + for (size_t i = 1; i != 12; ++i) { + test_string_value_set(i); + } + + vectorized::ColumnPtr string_column = + vectorized::ColumnHelper::create_column<vectorized::DataTypeString>( + {"ab", "cd", "ef", "gh", "ij", "kl", "mn", "op", "qr", "st", "uv", "wx"}); + auto nullmap_column = vectorized::ColumnUInt8::create(12, 0); + + vectorized::ColumnPtr nullable_column = + vectorized::ColumnNullable::create(string_column->clone(), nullmap_column->clone()); + + std::unique_ptr<HybridSetBase> string_value_set(create_string_value_set(0, true)); + string_value_set->insert_fixed_len(nullable_column, 0); + + ASSERT_EQ(string_value_set->size(), nullable_column->size()); + + auto results = vectorized::ColumnUInt8::create(string_column->size(), 0); + string_value_set->find_batch(*string_column, string_column->size(), results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_TRUE(results->get_data()[i]); + } + + string_value_set->clear(); + ASSERT_EQ(string_value_set->size(), 0); + + nullmap_column->get_data()[1] = 1; + nullmap_column->get_data()[3] = 1; + nullmap_column->get_data()[6] = 1; + auto nullable_column2 = + vectorized::ColumnNullable::create(string_column, nullmap_column->clone()); + + string_value_set->insert_fixed_len(nullable_column2, 0); + ASSERT_EQ(string_value_set->size(), nullable_column2->size() - 3); + + string_value_set->find_batch(*string_column, string_column->size(), results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], i != 1 && i != 3 && i != 6); + } + + // insert duplicated strings + string_value_set->insert_fixed_len(nullable_column2, 0); + ASSERT_EQ(string_value_set->size(), nullable_column2->size() - 3); + + string_value_set->find_batch(*string_column, string_column->size(), results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], i != 1 && i != 3 && i != 6); + } + + // test ColumnStr64 + auto string_overflow_size = config::string_overflow_size; + config::string_overflow_size = 10; + Defer defer([string_overflow_size]() { config::string_overflow_size = string_overflow_size; }); + + vectorized::ColumnPtr string64_column = string_column->clone()->convert_column_if_overflow(); + ASSERT_TRUE(string64_column->is_column_string64()); + + string_value_set->clear(); + ASSERT_EQ(string_value_set->size(), 0); + + string_value_set->insert_fixed_len(string64_column, 0); + ASSERT_EQ(string_value_set->size(), string64_column->size()); + + string_value_set->find_batch(*string_column, string_column->size(), results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_TRUE(results->get_data()[i]); + } + + string_value_set->clear(); + ASSERT_EQ(string_value_set->size(), 0); + + vectorized::ColumnNullable::Ptr nullable_column3 = + vectorized::ColumnNullable::create(string64_column->clone(), nullmap_column->clone()); + + string_value_set->insert_fixed_len(nullable_column3, 0); + ASSERT_EQ(string_value_set->size(), string64_column->size() - 3); + + string_value_set->find_batch(*string_column, string_column->size(), results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], i != 1 && i != 3 && i != 6); + } + + string_value_set->find_batch_negative(*string_column, string_column->size(), + results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], !(i != 1 && i != 3 && i != 6)); + } + + string_value_set->find_batch_nullable(*string_column, string_column->size(), + nullable_column2->get_null_map_data(), + results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], (i != 1 && i != 3 && i != 6)); + } + + string_value_set->find_batch_nullable_negative(*string_column, string_column->size(), + nullable_column2->get_null_map_data(), + results->get_data()); + for (size_t i = 0; i != string_column->size(); ++i) { + ASSERT_EQ(results->get_data()[i], !(i != 1 && i != 3 && i != 6)); + } + + try { + PInFilter in_filter; + string_value_set->to_pb(&in_filter); + } catch (...) { + } +} + } // namespace doris diff --git a/be/test/exprs/minmax_predicate_test.cpp b/be/test/exprs/minmax_predicate_test.cpp new file mode 100644 index 00000000000..c173ec2508c --- /dev/null +++ b/be/test/exprs/minmax_predicate_test.cpp @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <memory> +#include <string> + +#include "common/config.h" +#include "exprs/create_predicate_function.h" +#include "gtest/internal/gtest-internal.h" +#include "runtime_filter/utils.h" +#include "testutil/column_helper.h" + +namespace doris { +class MinmaxPredicateTest : public testing::Test { +protected: + MinmaxPredicateTest() {} + ~MinmaxPredicateTest() override = default; + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(MinmaxPredicateTest, InsertFixedLen) { + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 5, 6, 7, 8}); + + MinMaxNumFunc<int32_t> minmax_num_func(true); + minmax_num_func.insert_fixed_len(column, 0); + ASSERT_EQ(1, *(int32_t*)minmax_num_func.get_min()); + ASSERT_EQ(8, *(int32_t*)minmax_num_func.get_max()); + + auto nullable_column = vectorized::ColumnNullable::create( + column->clone(), vectorized::ColumnUInt8::create(column->size(), 0)); + minmax_num_func.insert_fixed_len(nullable_column->clone(), 0); + ASSERT_EQ(1, *(int32_t*)minmax_num_func.get_min()); + ASSERT_EQ(8, *(int32_t*)minmax_num_func.get_max()); + + nullable_column->get_null_map_data()[1] = 1; + nullable_column->get_null_map_data()[3] = 1; + nullable_column->get_null_map_data()[6] = 1; + + minmax_num_func.insert_fixed_len(nullable_column->clone(), 0); + ASSERT_EQ(1, *(int32_t*)minmax_num_func.get_min()); + ASSERT_EQ(8, *(int32_t*)minmax_num_func.get_max()); +} + +TEST_F(MinmaxPredicateTest, String) { + auto column = vectorized::ColumnHelper::create_column<vectorized::DataTypeString>( + {"ab", "cd", "ef", "gh", "ij", "kl", "mn", "op"}); + + MinMaxNumFunc<std::string> minmax_num_func(true); + + minmax_num_func.insert_fixed_len(column, 0); + ASSERT_EQ("ab", *(std::string*)minmax_num_func.get_min()); + ASSERT_EQ("op", *(std::string*)minmax_num_func.get_max()); + + auto nullable_column = vectorized::ColumnNullable::create( + column->clone(), vectorized::ColumnUInt8::create(column->size(), 0)); + minmax_num_func.insert_fixed_len(nullable_column->clone(), 0); + ASSERT_EQ("ab", *(std::string*)minmax_num_func.get_min()); + ASSERT_EQ("op", *(std::string*)minmax_num_func.get_max()); + + nullable_column->get_null_map_data()[1] = 1; + nullable_column->get_null_map_data()[3] = 1; + nullable_column->get_null_map_data()[6] = 1; + + minmax_num_func.insert_fixed_len(nullable_column->clone(), 0); + ASSERT_EQ("ab", *(std::string*)minmax_num_func.get_min()); + ASSERT_EQ("op", *(std::string*)minmax_num_func.get_max()); + + auto string_overflow_size = config::string_overflow_size; + config::string_overflow_size = 10; + Defer defer([string_overflow_size]() { config::string_overflow_size = string_overflow_size; }); + + auto string64_column = column->clone()->convert_column_if_overflow(); + ASSERT_TRUE(string64_column->is_column_string64()); + + MinMaxNumFunc<std::string> minmax_num_func2(true); + minmax_num_func2.insert_fixed_len(string64_column, 0); + ASSERT_EQ("ab", *(std::string*)minmax_num_func2.get_min()); + ASSERT_EQ("op", *(std::string*)minmax_num_func2.get_max()); + + auto nullable_column2 = vectorized::ColumnNullable::create( + column->clone(), vectorized::ColumnUInt8::create(column->size(), 0)); + nullable_column2->get_null_map_data()[1] = 1; + nullable_column2->get_null_map_data()[3] = 1; + nullable_column2->get_null_map_data()[6] = 1; + minmax_num_func2.insert_fixed_len(nullable_column2->clone(), 0); + ASSERT_EQ("ab", *(std::string*)minmax_num_func2.get_min()); + ASSERT_EQ("op", *(std::string*)minmax_num_func2.get_max()); +} +} // namespace doris \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h index 95865aea21e..38f30bbdcc6 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h @@ -111,11 +111,26 @@ public: class MockHashJoinSharedState : public HashJoinSharedState {}; +class MockRuntimeFilterProducerHelper : public RuntimeFilterProducerHelper { +public: + MockRuntimeFilterProducerHelper() = default; + ~MockRuntimeFilterProducerHelper() override = default; + + Status send_filter_size( + RuntimeState* state, uint64_t hash_table_size, + std::shared_ptr<pipeline::CountedFinishDependency> dependency) override { + return Status::OK(); + } + + Status skip_process(RuntimeState* state) override { return Status::OK(); } +}; + class MockHashJoinBuildSinkLocalState : public HashJoinBuildSinkLocalState { public: // DataSinkOperatorXBase* parent, RuntimeState* state MockHashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : HashJoinBuildSinkLocalState(parent, state) { + _runtime_filter_producer_helper = std::make_shared<MockRuntimeFilterProducerHelper>(); _runtime_profile = std::make_unique<RuntimeProfile>("test"); _profile = _runtime_profile.get(); _memory_used_counter = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org