This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch new_join in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push: new 61e3bf7262d update rf 61e3bf7262d is described below commit 61e3bf7262d4d747add178f5a44e356e23be2d3e Author: BiteTheDDDDt <pxl...@qq.com> AuthorDate: Mon Oct 30 18:07:35 2023 +0800 update rf update --- be/src/exprs/bitmapfilter_predicate.h | 8 +- be/src/exprs/block_bloom_filter.hpp | 20 +- be/src/exprs/bloom_filter_func.h | 310 +++++++++++------------------- be/src/exprs/hybrid_set.h | 119 +++++++----- be/src/exprs/minmax_predicate.h | 273 ++++++++------------------ be/src/exprs/runtime_filter.cpp | 111 ++++------- be/src/exprs/runtime_filter.h | 6 +- be/src/exprs/runtime_filter_slots.h | 23 +-- be/src/exprs/runtime_filter_slots_cross.h | 24 +-- be/src/olap/bloom_filter_predicate.h | 60 +----- be/src/vec/columns/column_dictionary.h | 29 +-- be/src/vec/common/hash_table/hash_map.h | 4 + be/src/vec/exprs/vbloom_predicate.cpp | 37 +--- 13 files changed, 335 insertions(+), 689 deletions(-) diff --git a/be/src/exprs/bitmapfilter_predicate.h b/be/src/exprs/bitmapfilter_predicate.h index 743a55c4b6e..8df488cf875 100644 --- a/be/src/exprs/bitmapfilter_predicate.h +++ b/be/src/exprs/bitmapfilter_predicate.h @@ -31,7 +31,7 @@ namespace doris { class BitmapFilterFuncBase : public FilterFuncBase { public: virtual void insert(const void* data) = 0; - virtual void insert_many(const std::vector<const BitmapValue*> bitmaps) = 0; + virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 0; virtual bool empty() = 0; virtual Status assign(BitmapValue* bitmap_value) = 0; virtual void light_copy(BitmapFilterFuncBase* other) { _not_in = other->_not_in; } @@ -60,7 +60,7 @@ public: void insert(const void* data) override; - void insert_many(const std::vector<const BitmapValue*> bitmaps) override; + void insert_many(const std::vector<const BitmapValue*>& bitmaps) override; uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets, int number) override; @@ -75,7 +75,7 @@ public: return Status::OK(); } - void light_copy(BitmapFilterFuncBase* bloomfilter_func) override; + void light_copy(BitmapFilterFuncBase* bitmapfilter_func) override; size_t size() const override { return _bitmap_value->cardinality(); } @@ -108,7 +108,7 @@ void BitmapFilterFunc<type>::insert(const void* data) { } template <PrimitiveType type> -void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*> bitmaps) { +void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*>& bitmaps) { if (bitmaps.empty()) { return; } diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp index 654867d6ccc..18c34bbb312 100644 --- a/be/src/exprs/block_bloom_filter.hpp +++ b/be/src/exprs/block_bloom_filter.hpp @@ -73,13 +73,6 @@ public: void insert(uint32_t hash) noexcept; // Same as above with convenience of hashing the key. void insert(const Slice& key) noexcept { - if (key.data) { - insert(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); - } - } - - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - void insert_crc32_hash(const Slice& key) noexcept { if (key.data) { insert(HashUtil::crc_hash(key.data, key.size, _hash_seed)); } @@ -124,21 +117,12 @@ public: } // Same as above with convenience of hashing the key. bool find(const Slice& key) const noexcept { - if (key.data) { - return find(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed)); - } else { - return false; - } - } - - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - bool find_crc32_hash(const Slice& key) const noexcept { if (key.data) { return find(HashUtil::crc_hash(key.data, key.size, _hash_seed)); - } else { - return false; } + return false; } + // Computes the logical OR of this filter with 'other' and stores the result in this // filter. // Notes: diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index a8330250ec0..a7b0904691f 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -53,23 +53,8 @@ public: return _bloom_filter->find(data); } - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - template <typename T> - bool test_new_hash(T data) const { - if constexpr (std::is_same_v<T, Slice>) { - return _bloom_filter->find_crc32_hash(data); - } else { - return _bloom_filter->find(data); - } - } - void add_bytes(const char* data, size_t len) { _bloom_filter->insert(Slice(data, len)); } - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - void add_bytes_new_hash(const char* data, size_t len) { - _bloom_filter->insert_crc32_hash(Slice(data, len)); - } - // test_element/find_element only used on vectorized engine template <typename T> bool test_element(T element) const { @@ -96,8 +81,6 @@ private: // Only Used In RuntimeFilter class BloomFilterFuncBase : public FilterFuncBase { public: - BloomFilterFuncBase() : _inited(false) {} - virtual ~BloomFilterFuncBase() = default; Status init(int64_t expect_num, double fpp) { @@ -112,9 +95,8 @@ public: Status init_with_fixed_length() { if (_build_bf_exactly) { return Status::OK(); - } else { - return init_with_fixed_length(_bloom_filter_length); } + return init_with_fixed_length(_bloom_filter_length); } Status init_with_cardinality(const size_t build_bf_cardinality) { @@ -128,9 +110,8 @@ public: // Handle case where ndv == 1 => ceil(log2(m/8)) < 0. int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2)))); return init_with_fixed_length(((int64_t)1) << log_filter_size); - } else { - return Status::OK(); } + return Status::OK(); } Status init_with_fixed_length(int64_t bloom_filter_length) { @@ -157,36 +138,35 @@ public: // allocate memory again. if (_inited) { DCHECK(bloomfilter_func != nullptr); - auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - LOG(WARNING) << "bloom filter size not the same: already allocated bytes = " - << _bloom_filter_alloced << ", expected allocated bytes = " - << other_func->_bloom_filter_alloced; - return Status::InvalidArgument("bloom filter size invalid"); + return Status::InvalidArgument( + "bloom filter size not the same: already allocated bytes {}, expected " + "allocated bytes {}", + _bloom_filter_alloced, other_func->_bloom_filter_alloced); } return _bloom_filter->merge(other_func->_bloom_filter.get()); } { std::lock_guard<std::mutex> l(_lock); if (!_inited) { - auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); DCHECK(_bloom_filter == nullptr); DCHECK(bloomfilter_func != nullptr); _bloom_filter = bloomfilter_func->_bloom_filter; _bloom_filter_alloced = other_func->_bloom_filter_alloced; _inited = true; return Status::OK(); - } else { - DCHECK(bloomfilter_func != nullptr); - auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); - if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - LOG(WARNING) << "bloom filter size not the same: already allocated bytes = " - << _bloom_filter_alloced << ", expected allocated bytes = " - << other_func->_bloom_filter_alloced; - return Status::InvalidArgument("bloom filter size invalid"); - } - return _bloom_filter->merge(other_func->_bloom_filter.get()); } + DCHECK(bloomfilter_func != nullptr); + auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { + return Status::InvalidArgument( + "bloom filter size not the same: already allocated bytes {}, expected " + "allocated bytes {}", + _bloom_filter_alloced, other_func->_bloom_filter_alloced); + } + return _bloom_filter->merge(other_func->_bloom_filter.get()); } } @@ -208,7 +188,7 @@ public: size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; } void light_copy(BloomFilterFuncBase* bloomfilter_func) { - auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); _bloom_filter_alloced = other_func->_bloom_filter_alloced; _bloom_filter = other_func->_bloom_filter; _inited = other_func->_inited; @@ -216,34 +196,21 @@ public: virtual void insert(const void* data) = 0; - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - virtual void insert_crc32_hash(const void* data) = 0; - virtual bool find(const void* data) const = 0; - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - virtual bool find_crc32_hash(const void* data) const = 0; - virtual bool find_olap_engine(const void* data) const = 0; virtual bool find_uint32_t(uint32_t data) const = 0; - virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; - - virtual void insert_fixed_len(const char* data) = 0; - - virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, - uint16_t* offsets, int number, - bool is_parse_column) = 0; + virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) = 0; - virtual void find_fixed_len(const char* data, const uint8* nullmap, int number, - uint8* results) = 0; + virtual void find_fixed_len(const vectorized::ColumnPtr& column, uint8_t* results) = 0; protected: // bloom filter size int32_t _bloom_filter_alloced; std::shared_ptr<BloomFilterAdaptor> _bloom_filter; - bool _inited; + bool _inited {}; std::mutex _lock; int64_t _bloom_filter_length; bool _build_bf_exactly = false; @@ -251,77 +218,50 @@ protected: template <class T> struct CommonFindOp { - // test_batch/find_batch/find_batch_olap_engine only used on vectorized engine - void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets, - int number) const { - for (int i = 0; i < number; i++) { - bloom_filter.add_element(*((T*)data + offsets[i])); - } - } - - void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) const { - bloom_filter.add_element(*((T*)data)); - } - - uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, - const uint8* nullmap, uint16_t* offsets, int number, - const bool is_parse_column) const { - uint16_t new_size = 0; - if (is_parse_column) { - if (nullmap == nullptr) { - for (int i = 0; i < number; i++) { - uint16_t idx = offsets[i]; - if (!bloom_filter.test_element(*((T*)data + idx))) { - continue; - } - offsets[new_size++] = idx; - } - } else { - for (int i = 0; i < number; i++) { - uint16_t idx = offsets[i]; - if (nullmap[idx]) { - continue; - } - if (!bloom_filter.test_element(*((T*)data + idx))) { - continue; - } - offsets[new_size++] = idx; + void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column, + size_t start) const { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = nullable->get_nested_column(); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + const T* data = (T*)col.get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + bloom_filter.add_element(*(data + i)); } } } else { - if (nullmap == nullptr) { - for (int i = 0; i < number; i++) { - if (!bloom_filter.test_element(*((T*)data + i))) { - continue; - } - offsets[new_size++] = i; - } - } else { - for (int i = 0; i < number; i++) { - if (nullmap[i]) { - continue; - } - if (!bloom_filter.test_element(*((T*)data + i))) { - continue; - } - offsets[new_size++] = i; - } + const T* data = (T*)column->get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + bloom_filter.add_element(*(data + i)); } } - return new_size; } - void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, - int number, uint8* results) const { - for (int i = 0; i < number; i++) { - results[i] = false; - if (nullmap != nullptr && nullmap[i]) { - continue; + void find_batch(const BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column, + uint8_t* results) const { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + const T* data = (T*)nullable->get_nested_column().get_raw_data().data; + for (size_t i = 0; i < column->size(); i++) { + if (!nullmap[i]) { + results[i] = bloom_filter.test_element(data[i]); + } else { + results[i] = false; + } } - if (!bloom_filter.test_element(*((T*)data + i))) { - continue; + } else { + const T* data = (T*)column->get_raw_data().data; + for (size_t i = 0; i < column->size(); i++) { + results[i] = bloom_filter.test_element(data[i]); } - results[i] = true; } } @@ -340,43 +280,62 @@ struct CommonFindOp { }; struct StringFindOp { - void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets, - int number) const { - LOG(FATAL) << "StringFindOp does not support insert_batch"; - } - - void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) const { - LOG(FATAL) << "StringFindOp does not support insert_single"; - } - - uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data, - const uint8* nullmap, uint16_t* offsets, int number, - const bool is_parse_column) const { - LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine"; - return 0; - } - - void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap, - int number, uint8* results) const { - LOG(FATAL) << "StringFindOp does not support find_batch"; + static void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column, + size_t start) { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + bloom_filter.add_element(col.get_data_at(i)); + } + } + } else { + const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + bloom_filter.add_element(col->get_data_at(i)); + } + } } - void insert(BloomFilterAdaptor& bloom_filter, const void* data) const { - const auto* value = reinterpret_cast<const StringRef*>(data); - if (value) { - bloom_filter.add_bytes(value->data, value->size); + static void find_batch(const BloomFilterAdaptor& bloom_filter, + const vectorized::ColumnPtr& column, uint8_t* results) { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + for (size_t i = 0; i < column->size(); i++) { + if (!nullmap[i]) { + results[i] = bloom_filter.test_element(col.get_data_at(i)); + } else { + results[i] = false; + } + } + } else { + const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); + for (size_t i = 0; i < column->size(); i++) { + results[i] = bloom_filter.test_element(col->get_data_at(i)); + } } } - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - void insert_crc32_hash(BloomFilterAdaptor& bloom_filter, const void* data) const { + static void insert(BloomFilterAdaptor& bloom_filter, const void* data) { const auto* value = reinterpret_cast<const StringRef*>(data); if (value) { - bloom_filter.add_bytes_new_hash(value->data, value->size); + bloom_filter.add_bytes(value->data, value->size); } } - bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const { + static bool find(const BloomFilterAdaptor& bloom_filter, const void* data) { const auto* value = reinterpret_cast<const StringRef*>(data); if (value == nullptr) { return false; @@ -384,19 +343,11 @@ struct StringFindOp { return bloom_filter.test(Slice(value->data, value->size)); } - //This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - bool find_crc32_hash(const BloomFilterAdaptor& bloom_filter, const void* data) const { - const auto* value = reinterpret_cast<const StringRef*>(data); - if (value == nullptr) { - return false; - } - return bloom_filter.test_new_hash(Slice(value->data, value->size)); - } - - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { + static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) { return StringFindOp::find(bloom_filter, data); } - bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const { + + static bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) { return bloom_filter.test(data); } }; @@ -404,7 +355,7 @@ struct StringFindOp { // We do not need to judge whether data is empty, because null will not appear // when filer used by the storage engine struct FixedStringFindOp : public StringFindOp { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* input_data) const { + static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* input_data) { const auto* value = reinterpret_cast<const StringRef*>(input_data); int64_t size = value->size; const char* data = value->data; @@ -417,7 +368,7 @@ struct FixedStringFindOp : public StringFindOp { }; struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { + static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) { VecDateTimeValue value; value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data)); return bloom_filter.test(Slice((char*)&value, sizeof(VecDateTimeValue))); @@ -428,7 +379,7 @@ struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> { // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684 struct DateFindOp : public CommonFindOp<VecDateTimeValue> { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { + static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) { uint24_t date = *static_cast<const uint24_t*>(data); uint64_t value = uint32_t(date); @@ -440,7 +391,7 @@ struct DateFindOp : public CommonFindOp<VecDateTimeValue> { }; struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value> { - bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const { + static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) { auto packed_decimal = *static_cast<const decimal12_t*>(data); DecimalV2Value value; int64_t int_value = packed_decimal.integer; @@ -502,37 +453,13 @@ public: dummy.insert(*_bloom_filter, data); } - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - void insert_crc32_hash(const void* data) override { - if constexpr (std::is_same_v<typename BloomFilterTypeTraits<type>::FindOp, StringFindOp> || - std::is_same_v<typename BloomFilterTypeTraits<type>::FindOp, - FixedStringFindOp>) { - DCHECK(_bloom_filter != nullptr); - dummy.insert_crc32_hash(*_bloom_filter, data); - } else { - insert(data); - } - } - - void insert_fixed_len(const char* data, const int* offsets, int number) override { + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { DCHECK(_bloom_filter != nullptr); - dummy.insert_batch(*_bloom_filter, data, offsets, number); + dummy.insert_batch(*_bloom_filter, column, start); } - void insert_fixed_len(const char* data) override { - DCHECK(_bloom_filter != nullptr); - dummy.insert_single(*_bloom_filter, data); - } - - uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets, - int number, const bool is_parse_column) override { - return dummy.find_batch_olap_engine(*_bloom_filter, data, nullmap, offsets, number, - is_parse_column); - } - - void find_fixed_len(const char* data, const uint8* nullmap, int number, - uint8* results) override { - dummy.find_batch(*_bloom_filter, data, nullmap, number, results); + void find_fixed_len(const vectorized::ColumnPtr& column, uint8_t* results) override { + dummy.find_batch(*_bloom_filter, column, results); } bool find(const void* data) const override { @@ -540,17 +467,6 @@ public: return dummy.find(*_bloom_filter, data); } - // This function is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - bool find_crc32_hash(const void* data) const override { - if constexpr (std::is_same_v<typename BloomFilterTypeTraits<type>::FindOp, StringFindOp> || - std::is_same_v<typename BloomFilterTypeTraits<type>::FindOp, - FixedStringFindOp>) { - DCHECK(_bloom_filter != nullptr); - return dummy.find_crc32_hash(*_bloom_filter, data); - } - return find(data); - } - bool find_olap_engine(const void* data) const override { return dummy.find_olap_engine(*_bloom_filter, data); } diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 6a90bdd47cd..9151dc7d3bd 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -29,7 +29,7 @@ namespace doris { -#define FIXED_CONTAINER_MAX_SIZE 8 +constexpr int FIXED_CONTAINER_MAX_SIZE = 8; /** * Fix Container can use simd to improve performance. 1 <= N <= 8 can be improved performance by test. FIXED_CONTAINER_MAX_SIZE = 8. @@ -44,7 +44,7 @@ public: class Iterator; - FixedContainer() : _size(0) { static_assert(N >= 0 && N <= FIXED_CONTAINER_MAX_SIZE); } + FixedContainer() { static_assert(N >= 0 && N <= FIXED_CONTAINER_MAX_SIZE); } ~FixedContainer() = default; @@ -141,7 +141,7 @@ public: private: std::array<T, N> _data; - size_t _size; + size_t _size {}; }; /** @@ -183,7 +183,7 @@ public: // use in vectorize execute engine virtual void insert(void* data, size_t) = 0; - virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; + virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) = 0; virtual void insert(HybridSetBase* set) { HybridSetBase::IteratorBase* iter = set->begin(); @@ -199,11 +199,6 @@ public: // use in vectorize execute engine virtual bool find(const void* data, size_t) const = 0; - virtual void find_fixed_len(const char* __restrict data, const uint8* __restrict null_map, - int number, uint8* __restrict results) { - LOG(FATAL) << "HybridSetBase not support find_fixed_len"; - } - virtual void find_batch(const doris::vectorized::IColumn& column, size_t rows, doris::vectorized::ColumnUInt8::Container& results) { LOG(FATAL) << "HybridSetBase not support find_batch"; @@ -275,21 +270,29 @@ public: if (data == nullptr) { return; } - - if constexpr (sizeof(ElementType) >= 16) { - // for large int, it will core dump with no memcpy - ElementType value; - memcpy(&value, data, sizeof(ElementType)); - _set.insert(value); + _set.insert(*reinterpret_cast<const ElementType*>(data)); + } + void insert(void* data, size_t /*unused*/) override { insert(data); } + + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = nullable->get_nested_column(); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + const ElementType* data = (ElementType*)col.get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + _set.insert(*(data + i)); + } + } } else { - _set.insert(*reinterpret_cast<const ElementType*>(data)); - } - } - void insert(void* data, size_t) override { insert(data); } - - void insert_fixed_len(const char* data, const int* offsets, int number) override { - for (int i = 0; i < number; i++) { - insert((void*)((ElementType*)data + offsets[i])); + const ElementType* data = (ElementType*)column->get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + _set.insert(*(data + i)); + } } } @@ -303,21 +306,7 @@ public: return _set.find(*reinterpret_cast<const ElementType*>(data)); } - bool find(const void* data, size_t) const override { return find(data); } - - void find_fixed_len(const char* __restrict data, const uint8* __restrict null_map, int number, - uint8* __restrict results) override { - ElementType* value = (ElementType*)data; - if (null_map == nullptr) { - for (int i = 0; i < number; i++) { - results[i] = _set.find(value[i]); - } - } else { - for (int i = 0; i < number; i++) { - results[i] = _set.find(value[i]) & !null_map[i]; - } - } - } + bool find(const void* data, size_t /*unused*/) const override { return find(data); } void find_batch(const doris::vectorized::IColumn& column, size_t rows, doris::vectorized::ColumnUInt8::Container& results) override { @@ -414,8 +403,26 @@ public: _set.insert(str_value); } - void insert_fixed_len(const char* data, const int* offsets, int number) override { - LOG(FATAL) << "string set not support insert_fixed_len"; + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + _set.insert(col.get_data_at(i).to_string()); + } + } + } else { + const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + _set.insert(col->get_data_at(i).to_string()); + } + } } int size() override { return _set.size(); } @@ -425,7 +432,7 @@ public: return false; } - auto* value = reinterpret_cast<const StringRef*>(data); + const auto* value = reinterpret_cast<const StringRef*>(data); std::string str_value(const_cast<const char*>(value->data), value->size); return _set.find(str_value); } @@ -461,7 +468,7 @@ public: void _find_batch(const doris::vectorized::IColumn& column, size_t rows, const doris::vectorized::NullMap* null_map, doris::vectorized::ColumnUInt8::Container& results) { - auto& col = assert_cast<const doris::vectorized::ColumnString&>(column); + const auto& col = assert_cast<const doris::vectorized::ColumnString&>(column); const uint8_t* __restrict null_map_data; if constexpr (is_nullable) { null_map_data = null_map->data(); @@ -538,8 +545,26 @@ public: _set.insert(sv); } - void insert_fixed_len(const char* data, const int* offsets, int number) override { - LOG(FATAL) << "string set not support insert_fixed_len"; + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + _set.insert(col.get_data_at(i)); + } + } + } else { + const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + _set.insert(col->get_data_at(i)); + } + } } int size() override { return _set.size(); } @@ -549,7 +574,7 @@ public: return false; } - auto* value = reinterpret_cast<const StringRef*>(data); + const auto* value = reinterpret_cast<const StringRef*>(data); return _set.find(*value); } @@ -588,10 +613,10 @@ public: void _find_batch(const doris::vectorized::IColumn& column, size_t rows, const doris::vectorized::NullMap* null_map, doris::vectorized::ColumnUInt8::Container& results) { - auto& col = assert_cast<const doris::vectorized::ColumnString&>(column); + const auto& col = assert_cast<const doris::vectorized::ColumnString&>(column); const uint32_t* __restrict offset = col.get_offsets().data(); const uint8_t* __restrict data = col.get_chars().data(); - uint8_t* __restrict cursor = const_cast<uint8_t*>(data); + auto* __restrict cursor = const_cast<uint8_t*>(data); const uint8_t* __restrict null_map_data; if constexpr (is_nullable) { null_map_data = null_map->data(); diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index cdf898292fc..efc4ebf8630 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -19,15 +19,16 @@ #include "common/object_pool.h" #include "runtime/type_limit.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" namespace doris { // only used in Runtime Filter class MinMaxFuncBase { public: virtual void insert(const void* data) = 0; - virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0; + virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) = 0; virtual bool find(void* data) = 0; - virtual bool is_empty() = 0; virtual void* get_max() = 0; virtual void* get_min() = 0; // assign minmax data @@ -37,7 +38,7 @@ public: virtual ~MinMaxFuncBase() = default; }; -template <class T> +template <class T, bool NeedMax = true, bool NeedMin = true> class MinMaxNumFunc : public MinMaxFuncBase { public: MinMaxNumFunc() = default; @@ -50,32 +51,52 @@ public: T val_data = *reinterpret_cast<const T*>(data); - if (_empty) { - _min = val_data; - _max = val_data; - _empty = false; - return; + if constexpr (NeedMin) { + if (val_data < _min) { + _min = val_data; + } } - if (val_data < _min) { - _min = val_data; - } else if (val_data > _max) { - _max = val_data; + + if constexpr (NeedMax) { + if (val_data > _max) { + _max = val_data; + } } } - void insert_fixed_len(const char* data, const int* offsets, int number) override { - if (!number) { + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + if (column->empty()) { return; } - if (_empty) { - _min = *((T*)data + offsets[0]); - _max = *((T*)data + offsets[0]); - } - for (int i = _empty; i < number; i++) { - _min = std::min(_min, *((T*)data + offsets[i])); - _max = std::max(_max, *((T*)data + offsets[i])); + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = nullable->get_nested_column(); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + + const T* data = (T*)col.get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + if constexpr (NeedMin) { + _min = std::min(_min, *(data + i)); + } + if constexpr (NeedMax) { + _max = std::max(_max, *(data + i)); + } + } + } + } else { + const T* data = (T*)column->get_raw_data().data; + for (size_t i = start; i < column->size(); i++) { + if constexpr (NeedMin) { + _min = std::min(_min, *(data + i)); + } + if constexpr (NeedMax) { + _max = std::max(_max, *(data + i)); + } + } } - _empty = false; } bool find(void* data) override { @@ -84,40 +105,55 @@ public: } T val_data = *reinterpret_cast<T*>(data); - return val_data >= _min && val_data <= _max; + if constexpr (NeedMin) { + if (val_data < _min) { + return false; + } + } + if constexpr (NeedMax) { + if (val_data > _max) { + return false; + } + } + return true; } Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override { if constexpr (std::is_same_v<T, StringRef>) { - MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); - - if (other_minmax->_min < _min) { - auto& other_min = other_minmax->_min; - auto str = pool->add(new std::string(other_min.data, other_min.size)); - _min.data = str->data(); - _min.size = str->length(); + auto* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); + if constexpr (NeedMin) { + if (other_minmax->_min < _min) { + auto& other_min = other_minmax->_min; + auto* str = pool->add(new std::string(other_min.data, other_min.size)); + _min.data = str->data(); + _min.size = str->length(); + } } - if (other_minmax->_max > _max) { - auto& other_max = other_minmax->_max; - auto str = pool->add(new std::string(other_max.data, other_max.size)); - _max.data = str->data(); - _max.size = str->length(); + if constexpr (NeedMax) { + if (other_minmax->_max > _max) { + auto& other_max = other_minmax->_max; + auto* str = pool->add(new std::string(other_max.data, other_max.size)); + _max.data = str->data(); + _max.size = str->length(); + } } } else { - MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); - if (other_minmax->_min < _min) { - _min = other_minmax->_min; + auto* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func); + if constexpr (NeedMin) { + if (other_minmax->_min < _min) { + _min = other_minmax->_min; + } } - if (other_minmax->_max > _max) { - _max = other_minmax->_max; + if constexpr (NeedMax) { + if (other_minmax->_max > _max) { + _max = other_minmax->_max; + } } } return Status::OK(); } - bool is_empty() override { return _empty; } - void* get_max() override { return &_max; } void* get_min() override { return &_min; } @@ -131,161 +167,12 @@ public: protected: T _max = type_limit<T>::min(); T _min = type_limit<T>::max(); - // we use _empty to avoid compare twice - bool _empty = true; }; template <class T> -class MinNumFunc : public MinMaxNumFunc<T> { -public: - MinNumFunc() = default; - ~MinNumFunc() override = default; - - void insert(const void* data) override { - if (data == nullptr) { - return; - } - - T val_data = *reinterpret_cast<const T*>(data); - - if (this->_empty) { - this->_min = val_data; - this->_empty = false; - return; - } - if (val_data < this->_min) { - this->_min = val_data; - } - } - - void insert_fixed_len(const char* data, const int* offsets, int number) override { - if (!number) { - return; - } - if (this->_empty) { - this->_min = *((T*)data + offsets[0]); - } - for (int i = this->_empty; i < number; i++) { - this->_min = std::min(this->_min, *((T*)data + offsets[i])); - } - this->_empty = false; - } - - bool find(void* data) override { - if (data == nullptr) { - return false; - } - - T val_data = *reinterpret_cast<T*>(data); - return val_data >= this->_min; - } - - Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override { - if constexpr (std::is_same_v<T, StringRef>) { - MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func); - if (other_minmax->_min < this->_min) { - auto& other_min = other_minmax->_min; - auto str = pool->add(new std::string(other_min.data, other_min.size)); - this->_min.data = str->data(); - this->_min.size = str->length(); - } - } else { - MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func); - if (other_minmax->_min < this->_min) { - this->_min = other_minmax->_min; - } - } - - return Status::OK(); - } - - //min filter the max is useless, so return nullptr directly - void* get_max() override { - DCHECK(false); - return nullptr; - } - - Status assign(void* min_data, void* max_data) override { - this->_min = *(T*)min_data; - return Status::OK(); - } -}; +using MinNumFunc = MinMaxNumFunc<T, false, true>; template <class T> -class MaxNumFunc : public MinMaxNumFunc<T> { -public: - MaxNumFunc() = default; - ~MaxNumFunc() override = default; - - void insert(const void* data) override { - if (data == nullptr) { - return; - } - - T val_data = *reinterpret_cast<const T*>(data); - - if (this->_empty) { - this->_max = val_data; - this->_empty = false; - return; - } - if (val_data > this->_max) { - this->_max = val_data; - } - } - - void insert_fixed_len(const char* data, const int* offsets, int number) override { - if (!number) { - return; - } - if (this->_empty) { - this->_max = *((T*)data + offsets[0]); - } - for (int i = this->_empty; i < number; i++) { - this->_max = std::max(this->_max, *((T*)data + offsets[i])); - } - this->_empty = false; - } - - bool find(void* data) override { - if (data == nullptr) { - return false; - } - - T val_data = *reinterpret_cast<T*>(data); - return val_data <= this->_max; - } - - Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override { - if constexpr (std::is_same_v<T, StringRef>) { - MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func); - - if (other_minmax->_max > this->_max) { - auto& other_max = other_minmax->_max; - auto str = pool->add(new std::string(other_max.data, other_max.size)); - this->_max.data = str->data(); - this->_max.size = str->length(); - } - } else { - MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func); - if (other_minmax->_max > this->_max) { - this->_max = other_minmax->_max; - } - } - - return Status::OK(); - } - - //max filter the min is useless, so return nullptr directly - void* get_min() override { - DCHECK(false); - return nullptr; - } - - Status assign(void* min_data, void* max_data) override { - this->_max = *(T*)max_data; - return Status::OK(); - } -}; +using MaxNumFunc = MinMaxNumFunc<T, true, false>; } // namespace doris diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c6e64fd0e55..53c222488e3 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -51,6 +51,7 @@ #include "util/string_parser.hpp" #include "vec/columns/column.h" #include "vec/columns/column_complex.h" +#include "vec/columns/column_nullable.h" #include "vec/common/assert_cast.h" #include "vec/core/wide_integer.h" #include "vec/core/wide_integer_to_string.h" @@ -286,10 +287,7 @@ public: _pool(pool), _column_return_type(params->column_return_type), _filter_type(params->filter_type), - _filter_id(params->filter_id), - _use_batch( - IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), - _use_new_hash(_be_exec_version >= 2) {} + _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type, @@ -299,10 +297,7 @@ public: _pool(pool), _column_return_type(column_type), _filter_type(type), - _filter_id(filter_id), - _use_batch( - IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), - _use_new_hash(_be_exec_version >= 2) {} + _filter_id(filter_id) {} RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, const RuntimeFilterParams* params) @@ -311,10 +306,7 @@ public: _pool(pool), _column_return_type(params->column_return_type), _filter_type(params->filter_type), - _filter_id(params->filter_id), - _use_batch( - IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), - _use_new_hash(_be_exec_version >= 2) {} + _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type, @@ -324,10 +316,7 @@ public: _pool(pool), _column_return_type(column_type), _filter_type(type), - _filter_id(filter_id), - _use_batch( - IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), - _use_new_hash(_be_exec_version >= 2) {} + _filter_id(filter_id) {} // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { @@ -389,23 +378,10 @@ public: void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { if (_context.hybrid_set->size() > 0) { - auto it = _context.hybrid_set->begin(); - - if (_use_batch) { - while (it->has_next()) { - bloom_filter->insert_fixed_len((char*)it->get_value()); - it->next(); - } - } else { - while (it->has_next()) { - if (_use_new_hash) { - bloom_filter->insert_crc32_hash(it->get_value()); - } else { - bloom_filter->insert(it->get_value()); - } - - it->next(); - } + auto* it = _context.hybrid_set->begin(); + while (it->has_next()) { + bloom_filter->insert(it->get_value()); + it->next(); } } } @@ -428,20 +404,12 @@ public: break; } case RuntimeFilterType::BLOOM_FILTER: { - if (_use_new_hash) { - _context.bloom_filter_func->insert_crc32_hash(data); - } else { - _context.bloom_filter_func->insert(data); - } + _context.bloom_filter_func->insert(data); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - if (_use_new_hash) { - _context.bloom_filter_func->insert_crc32_hash(data); - } else { - _context.bloom_filter_func->insert(data); - } + _context.bloom_filter_func->insert(data); } else { _context.hybrid_set->insert(data); } @@ -457,30 +425,30 @@ public: } } - void insert_fixed_len(const char* data, const int* offsets, int number) { + void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { if (_is_ignored_in_filter) { break; } - _context.hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(column, start); break; } case RuntimeFilterType::MIN_FILTER: case RuntimeFilterType::MAX_FILTER: case RuntimeFilterType::MINMAX_FILTER: { - _context.minmax_func->insert_fixed_len(data, offsets, number); + _context.minmax_func->insert_fixed_len(column, start); break; } case RuntimeFilterType::BLOOM_FILTER: { - _context.bloom_filter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(column, start); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - _context.bloom_filter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(column, start); } else { - _context.hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(column, start); } break; } @@ -508,24 +476,33 @@ public: } } - void insert_batch(const vectorized::ColumnPtr column, const std::vector<int>& rows) { + void insert_batch(const vectorized::ColumnPtr& column, size_t start) { if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) { - bitmap_filter_insert_batch(column, rows); - } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)) { - insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size()); + bitmap_filter_insert_batch(column, start); } else { - for (int index : rows) { - insert(column->get_data_at(index)); - } + insert_fixed_len(column, start); } } - void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, - const std::vector<int>& rows) { + void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t start) { std::vector<const BitmapValue*> bitmaps; - auto* col = assert_cast<const vectorized::ColumnComplexType<BitmapValue>*>(column.get()); - for (int index : rows) { - bitmaps.push_back(&(col->get_data()[index])); + if (column->is_nullable()) { + const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); + const auto& col = + assert_cast<const vectorized::ColumnBitmap&>(nullable->get_nested_column()); + const auto& nullmap = + assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) + .get_data(); + for (size_t i = start; i < column->size(); i++) { + if (!nullmap[i]) { + bitmaps.push_back(&(col.get_data()[i])); + } + } + } else { + const auto* col = assert_cast<const vectorized::ColumnBitmap*>(column.get()); + for (size_t i = start; i < column->size(); i++) { + bitmaps.push_back(&(col->get_data()[i])); + } } _context.bitmap_filter_func->insert_many(bitmaps); } @@ -1039,13 +1016,6 @@ private: bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; uint32_t _filter_id; - - // When _column_return_type is invalid, _use_batch will be always false. - bool _use_batch; - - // When _use_new_hash is set to true, use the new hash method. - // This is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - const bool _use_new_hash; }; Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, @@ -1092,10 +1062,9 @@ void IRuntimeFilter::insert(const StringRef& value) { _wrapper->insert(value); } -void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, - const std::vector<int>& rows) { +void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t start) { DCHECK(is_producer()); - _wrapper->insert_batch(column, rows); + _wrapper->insert_batch(column, start); } Status IRuntimeFilter::publish() { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index fdd3b02ad63..ed4298dda85 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -262,7 +262,7 @@ public: // only used for producer void insert(const void* data); void insert(const StringRef& data); - void insert_batch(vectorized::ColumnPtr column, const std::vector<int>& rows); + void insert_batch(vectorized::ColumnPtr column, size_t start); // publish filter // push filter to remote node or push down it to scan_node @@ -350,10 +350,6 @@ public: void update_runtime_filter_type_to_profile(); - static bool enable_use_batch(bool use_batch, PrimitiveType type) { - return use_batch && (is_int_or_bool(type) || is_float_or_double(type)); - } - int filter_id() const { return _filter_id; } static std::string to_string(RuntimeFilterType type) { diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 0f841e5a60f..01155493f87 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -172,29 +172,8 @@ public: int result_column_id = _build_expr_context[i]->get_last_result_column_id(); for (const auto* it : datas) { auto column = it->get_by_position(result_column_id).column; - - std::vector<int> indexs; - // indexs start from 1 because the first row is mocked for join hash map - if (const auto* nullable = - vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { - column = nullable->get_nested_column_ptr(); - const uint8_t* null_map = assert_cast<const vectorized::ColumnUInt8*>( - nullable->get_null_map_column_ptr().get()) - ->get_data() - .data(); - for (int i = 1; i < column->size(); i++) { - if (null_map[i]) { - continue; - } - indexs.push_back(i); - } - } else { - for (int i = 1; i < column->size(); i++) { - indexs.push_back(i); - } - } for (auto* filter : iter->second) { - filter->insert_batch(column, indexs); + filter->insert_batch(column, 1); } } } diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 4868b27a4ea..76b6085bab9 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -61,7 +61,7 @@ public: Status insert(vectorized::Block* block) { for (int i = 0; i < _runtime_filters.size(); ++i) { auto* filter = _runtime_filters[i]; - auto& vexpr_ctx = filter_src_expr_ctxs[i]; + const auto& vexpr_ctx = filter_src_expr_ctxs[i]; int result_column_id = -1; RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id)); @@ -70,25 +70,7 @@ public: block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - auto& column = block->get_by_position(result_column_id).column; - if (auto* nullable = - vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { - auto& column_nested = nullable->get_nested_column_ptr(); - auto& column_nullmap = nullable->get_null_map_column_ptr(); - std::vector<int> indexs; - for (int row_index = 0; row_index < column->size(); ++row_index) { - if (assert_cast<const vectorized::ColumnUInt8*>(column_nullmap.get()) - ->get_bool(row_index)) { - continue; - } - indexs.push_back(row_index); - } - filter->insert_batch(column_nested, indexs); - } else { - std::vector<int> rows(column->size()); - std::iota(rows.begin(), rows.end(), 0); - filter->insert_batch(column, rows); - } + filter->insert_batch(block->get_by_position(result_column_id).column, 0); } return Status::OK(); } @@ -100,7 +82,7 @@ public: return Status::OK(); } - bool empty() { return !_runtime_filters.size(); } + bool empty() { return _runtime_filters.empty(); } private: const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs; diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index d2816be9966..87f5ff266c3 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -76,61 +76,17 @@ private: uint16_t new_size = 0; if (column.is_column_dictionary()) { - auto* dict_col = reinterpret_cast<const vectorized::ColumnDictI32*>(&column); - if (_be_exec_version >= 2) { - for (uint16_t i = 0; i < size; i++) { - uint16_t idx = sel[i]; - sel[new_size] = idx; - if constexpr (is_nullable) { - new_size += !null_map[idx] && _specific_filter->find_uint32_t( - dict_col->get_crc32_hash_value(idx)); - } else { - new_size += _specific_filter->find_uint32_t( - dict_col->get_crc32_hash_value(idx)); - } - } - } else { - for (uint16_t i = 0; i < size; i++) { - uint16_t idx = sel[i]; - sel[new_size] = idx; - if constexpr (is_nullable) { - new_size += !null_map[idx] && - _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); - } else { - new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); - } - } - } - } else if (is_string_type(T) && _be_exec_version >= 2) { - auto& pred_col = - reinterpret_cast< - const vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>( - &column) - ->get_data(); - - auto pred_col_data = pred_col.data(); - const bool is_dense_column = pred_col.size() == size; + const auto* dict_col = reinterpret_cast<const vectorized::ColumnDictI32*>(&column); for (uint16_t i = 0; i < size; i++) { - uint16_t idx = is_dense_column ? i : sel[i]; + uint16_t idx = sel[i]; + sel[new_size] = idx; if constexpr (is_nullable) { - if (!null_map[idx] && - _specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) { - sel[new_size++] = idx; - } + new_size += !null_map[idx] && + _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); } else { - if (_specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) { - sel[new_size++] = idx; - } + new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); } } - } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) { - const auto& data = - reinterpret_cast< - const vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>( - &column) - ->get_data(); - new_size = _specific_filter->find_fixed_len_olap_engine((char*)data.data(), null_map, - sel, size, data.size() != size); } else { auto& pred_col = reinterpret_cast< @@ -177,8 +133,8 @@ uint16_t BloomFilterColumnPredicate<T>::evaluate(const vectorized::IColumn& colu return size; } if (column.is_nullable()) { - auto* nullable_col = reinterpret_cast<const vectorized::ColumnNullable*>(&column); - auto& null_map_data = nullable_col->get_null_map_column().get_data(); + const auto* nullable_col = reinterpret_cast<const vectorized::ColumnNullable*>(&column); + const auto& null_map_data = nullable_col->get_null_map_column().get_data(); new_size = evaluate<true>(nullable_col->get_nested_column(), null_map_data.data(), sel, size); } else { diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 1f107e629f4..1b7ef90f61b 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -283,9 +283,7 @@ public: } uint32_t get_hash_value(uint32_t idx) const { return _dict.get_hash_value(_codes[idx], _type); } - uint32_t get_crc32_hash_value(uint32_t idx) const { - return _dict.get_crc32_hash_value(_codes[idx], _type); - } + template <typename HybridSetType> void find_codes(const HybridSetType* values, std::vector<vectorized::UInt8>& selected) const { return _dict.find_codes(values, selected); @@ -384,31 +382,6 @@ public: } inline uint32_t get_hash_value(T code, FieldType type) const { - if (_compute_hash_value_flags[code]) { - return _hash_values[code]; - } else { - auto& sv = (*_dict_data)[code]; - // The char data is stored in the disk with the schema length, - // and zeros are filled if the length is insufficient - - // When reading data, use shrink_char_type_column_suffix_zero(_char_type_idx) - // Remove the suffix 0 - // When writing data, use the CharField::consume function to fill in the trailing 0. - - // For dictionary data of char type, sv.size is the schema length, - // so use strnlen to remove the 0 at the end to get the actual length. - int32_t len = sv.size; - if (type == FieldType::OLAP_FIELD_TYPE_CHAR) { - len = strnlen(sv.data, sv.size); - } - uint32_t hash_val = HashUtil::murmur_hash3_32(sv.data, len, 0); - _hash_values[code] = hash_val; - _compute_hash_value_flags[code] = 1; - return _hash_values[code]; - } - } - - inline uint32_t get_crc32_hash_value(T code, FieldType type) const { if (_compute_hash_value_flags[code]) { return _hash_values[code]; } else { diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index c26234b4e22..07528b857fa 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -20,6 +20,8 @@ #pragma once +#include <gen_cpp/PlanNodes_types.h> + #include <span> #include "vec/common/hash_table/hash.h" @@ -232,6 +234,8 @@ public: uint32_t get_bucket_size() const { return bucket_size; } + size_t size() const { return next.size(); } + void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, size_t num_elem) { build_keys = keys; diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 06bd21a6eb3..176ecb219ce 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -88,41 +88,16 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result block->get_by_position(arguments[0]).column->convert_to_full_column_if_const(); size_t sz = argument_column->size(); res_data_column->resize(sz); - auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data(); + auto* ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data(); auto type = WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type)); if (type.is_string_or_fixed_string()) { - // When _be_exec_version is equal to or greater than 2, we use the new hash method. - // This is only to be used if the be_exec_version may be less than 2. If updated, please delete it. - if (_be_exec_version >= 2) { - for (size_t i = 0; i < sz; i++) { - /// TODO: remove virtual function call in get_data_at to improve performance - auto ele = argument_column->get_data_at(i); - const StringRef v(ele.data, ele.size); - ptr[i] = _filter->find_crc32_hash(reinterpret_cast<const void*>(&v)); - } - } else { - for (size_t i = 0; i < sz; i++) { - auto ele = argument_column->get_data_at(i); - const StringRef v(ele.data, ele.size); - ptr[i] = _filter->find(reinterpret_cast<const void*>(&v)); - } - } - } else if (_be_exec_version > 0 && (type.is_int_or_uint() || type.is_float())) { - if (argument_column->is_nullable()) { - auto column_nested = reinterpret_cast<const ColumnNullable*>(argument_column.get()) - ->get_nested_column_ptr(); - auto column_nullmap = reinterpret_cast<const ColumnNullable*>(argument_column.get()) - ->get_null_map_column_ptr(); - _filter->find_fixed_len(column_nested->get_raw_data().data, - (uint8*)column_nullmap->get_raw_data().data, sz, ptr); - } else { - _filter->find_fixed_len(argument_column->get_raw_data().data, nullptr, sz, ptr); - } - } else { for (size_t i = 0; i < sz; i++) { - ptr[i] = _filter->find( - reinterpret_cast<const void*>(argument_column->get_data_at(i).data)); + auto ele = argument_column->get_data_at(i); + const StringRef v(ele.data, ele.size); + ptr[i] = _filter->find(reinterpret_cast<const void*>(&v)); } + } else { + _filter->find_fixed_len(argument_column, ptr); } if (_data_type->is_nullable()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org