This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 45645333b4d [Refactor](rf) Refactor the rf code interface to remove update filter v1 (#31643) 45645333b4d is described below commit 45645333b4d33d84d1aee92b1dfa77f0bff35216 Author: HappenLee <happen...@hotmail.com> AuthorDate: Sat Mar 2 09:15:32 2024 +0800 [Refactor](rf) Refactor the rf code interface to remove update filter v1 (#31643) --- be/src/exprs/bitmapfilter_predicate.h | 2 +- be/src/exprs/bloom_filter_func.h | 65 ++++++++++++++++++++++++----------- be/src/exprs/hybrid_set.h | 16 +++++---- be/src/exprs/minmax_predicate.h | 10 +++--- be/src/exprs/runtime_filter.cpp | 5 +++ be/src/exprs/runtime_filter.h | 4 ++- be/src/vec/columns/column_nullable.h | 2 +- gensrc/proto/internal_service.proto | 3 ++ 8 files changed, 72 insertions(+), 35 deletions(-) diff --git a/be/src/exprs/bitmapfilter_predicate.h b/be/src/exprs/bitmapfilter_predicate.h index 8df488cf875..376453c0681 100644 --- a/be/src/exprs/bitmapfilter_predicate.h +++ b/be/src/exprs/bitmapfilter_predicate.h @@ -28,7 +28,7 @@ namespace doris { // only used in Runtime Filter -class BitmapFilterFuncBase : public FilterFuncBase { +class BitmapFilterFuncBase : public RuntimeFilterFuncBase { public: virtual void insert(const void* data) = 0; virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 0; diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 84e6eba1e44..ce1ceb6f8f7 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -27,7 +27,10 @@ namespace doris { class BloomFilterAdaptor { public: - BloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); } + BloomFilterAdaptor(bool null_aware = false) : _null_aware(null_aware) { + _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); + } + static int64_t optimal_bit_num(int64_t expect_num, double fpp) { return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8; } @@ -74,12 +77,18 @@ public: } } + void set_contain_null() { _contain_null = true; } + + bool contain_null() const { return _null_aware && _contain_null; } + private: + bool _null_aware = false; + bool _contain_null = false; std::shared_ptr<doris::BlockBloomFilter> _bloom_filter; }; // Only Used In RuntimeFilter -class BloomFilterFuncBase : public FilterFuncBase { +class BloomFilterFuncBase : public RuntimeFilterFuncBase { public: virtual ~BloomFilterFuncBase() = default; @@ -236,10 +245,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor& bloom_filter, const char* dat for (int i = 0; i < number; i++) { uint16_t idx = offsets[i]; if (nullmap[idx]) { - continue; - } - if (!bloom_filter.test_element(get_element(data, idx))) { - continue; + if (!bloom_filter.contain_null()) { + continue; + } + } else { + if (!bloom_filter.test_element(get_element(data, idx))) { + continue; + } } offsets[new_size++] = idx; } @@ -255,10 +267,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor& bloom_filter, const char* dat } else { for (int i = 0; i < number; i++) { if (nullmap[i]) { - continue; - } - if (!bloom_filter.test_element(get_element(data, i))) { - continue; + if (!bloom_filter.contain_null()) { + continue; + } + } else { + if (!bloom_filter.test_element(get_element(data, i))) { + continue; + } } offsets[new_size++] = i; } @@ -277,6 +292,7 @@ struct CommonFindOp { void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column, size_t start) const { + const auto size = column->size(); if (column->is_nullable()) { const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); const auto& col = nullable->get_nested_column(); @@ -285,14 +301,16 @@ struct CommonFindOp { .get_data(); const T* data = (T*)col.get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if (!nullmap[i]) { bloom_filter.add_element(*(data + i)); + } else { + bloom_filter.set_contain_null(); } } } else { const T* data = (T*)column->get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { bloom_filter.add_element(*(data + i)); } } @@ -315,16 +333,17 @@ struct CommonFindOp { data = (T*)column->get_raw_data().data; } + const auto size = column->size(); if (nullmap) { - for (size_t i = 0; i < column->size(); i++) { + for (size_t i = 0; i < size; i++) { if (!nullmap[i]) { results[i] = bloom_filter.test_element(data[i]); } else { - results[i] = false; + results[i] = bloom_filter.contain_null(); } } } else { - for (size_t i = 0; i < column->size(); i++) { + for (size_t i = 0; i < size; i++) { results[i] = bloom_filter.test_element(data[i]); } } @@ -346,14 +365,16 @@ struct StringFindOp : CommonFindOp<StringRef> { assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) .get_data(); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < col.size(); i++) { if (!nullmap[i]) { bloom_filter.add_element(col.get_data_at(i)); + } else { + bloom_filter.set_contain_null(); } } } else { const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < col->size(); i++) { bloom_filter.add_element(col->get_data_at(i)); } } @@ -368,22 +389,23 @@ struct StringFindOp : CommonFindOp<StringRef> { const auto& nullmap = assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) .get_data(); + if (nullable->has_null()) { - for (size_t i = 0; i < column->size(); i++) { + for (size_t i = 0; i < col.size(); i++) { if (!nullmap[i]) { results[i] = bloom_filter.test_element(col.get_data_at(i)); } else { - results[i] = false; + results[i] = bloom_filter.contain_null(); } } } else { - for (size_t i = 0; i < column->size(); i++) { + for (size_t i = 0; i < col.size(); i++) { results[i] = bloom_filter.test_element(col.get_data_at(i)); } } } else { const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); - for (size_t i = 0; i < column->size(); i++) { + for (size_t i = 0; i < col->size(); i++) { results[i] = bloom_filter.test_element(col->get_data_at(i)); } } @@ -451,6 +473,7 @@ public: uint16_t idx = offsets[i]; offsets[new_size] = idx; if constexpr (is_nullable) { + new_size += nullmap[idx] && _bloom_filter->contain_null(); new_size += !nullmap[idx] && _bloom_filter->test(column->get_hash_value(idx)); } else { new_size += _bloom_filter->test(column->get_hash_value(idx)); diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 9151dc7d3bd..96e0c3f879a 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -175,7 +175,7 @@ private: }; // TODO Maybe change void* parameter to template parameter better. -class HybridSetBase : public FilterFuncBase { +class HybridSetBase : public RuntimeFilterFuncBase { public: HybridSetBase() = default; virtual ~HybridSetBase() = default; @@ -275,6 +275,8 @@ public: void insert(void* data, size_t /*unused*/) override { insert(data); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { + const auto size = column->size(); + if (column->is_nullable()) { const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get()); const auto& col = nullable->get_nested_column(); @@ -283,14 +285,14 @@ public: .get_data(); const ElementType* data = (ElementType*)col.get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if (!nullmap[i]) { _set.insert(*(data + i)); } } } else { const ElementType* data = (ElementType*)column->get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { _set.insert(*(data + i)); } } @@ -412,14 +414,14 @@ public: assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) .get_data(); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < nullable->size(); i++) { if (!nullmap[i]) { _set.insert(col.get_data_at(i).to_string()); } } } else { const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < col->size(); i++) { _set.insert(col->get_data_at(i).to_string()); } } @@ -554,14 +556,14 @@ public: assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column()) .get_data(); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < nullable->size(); i++) { if (!nullmap[i]) { _set.insert(col.get_data_at(i)); } } } else { const auto& col = assert_cast<const vectorized::ColumnString*>(column.get()); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < col->size(); i++) { _set.insert(col->get_data_at(i)); } } diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h index b9ee56a8dc1..297530dbd84 100644 --- a/be/src/exprs/minmax_predicate.h +++ b/be/src/exprs/minmax_predicate.h @@ -65,9 +65,10 @@ public: } void update_batch(const vectorized::ColumnPtr& column, size_t start) { + const auto size = column->size(); if constexpr (std::is_same_v<T, StringRef>) { const auto& column_string = assert_cast<const vectorized::ColumnString&>(*column); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if constexpr (NeedMin) { _min = std::min(_min, column_string.get_data_at(i)); } @@ -77,7 +78,7 @@ public: } } else { const T* data = (T*)column->get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if constexpr (NeedMin) { _min = std::min(_min, *(data + i)); } @@ -90,9 +91,10 @@ public: void update_batch(const vectorized::ColumnPtr& column, const vectorized::NullMap& nullmap, size_t start) { + const auto size = column->size(); if constexpr (std::is_same_v<T, StringRef>) { const auto& column_string = assert_cast<const vectorized::ColumnString&>(*column); - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if (!nullmap[i]) { if constexpr (NeedMin) { _min = std::min(_min, column_string.get_data_at(i)); @@ -104,7 +106,7 @@ public: } } else { const T* data = (T*)column->get_raw_data().data; - for (size_t i = start; i < column->size(); i++) { + for (size_t i = start; i < size; i++) { if (!nullmap[i]) { if constexpr (NeedMin) { _min = std::min(_min, *(data + i)); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 24c41613be4..84c82d89b38 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1026,6 +1026,8 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo merge_filter_request->set_filter_id(_filter_id); merge_filter_request->set_opt_remote_rf(opt_remote_rf); merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec); + auto column_type = _wrapper->column_type(); + merge_filter_request->set_column_type(to_proto(column_type)); merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); Status serialize_status = serialize(merge_filter_request.get(), &data, &len); @@ -1325,6 +1327,9 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, if (param->request->has_in_filter()) { column_type = to_primitive_type(param->request->in_filter().column_type()); } + if (param->request->has_column_type()) { + column_type = to_primitive_type(param->request->column_type()); + } wrapper->reset(new RuntimePredicateWrapper(pool, column_type, get_type(filter_type), param->request->filter_id())); switch (filter_type) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 91456cccced..c83758f38ba 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -132,7 +132,8 @@ struct RuntimeFilterParams { bool bitmap_filter_not_in; bool build_bf_exactly; }; -struct FilterFuncBase { + +struct RuntimeFilterFuncBase { public: void set_filter_id(int filter_id) { if (_filter_id == -1) { @@ -147,6 +148,7 @@ public: private: int _filter_id = -1; }; + struct UpdateRuntimeFilterParams { UpdateRuntimeFilterParams(const PPublishFilterRequest* req, butil::IOBufAsZeroCopyInputStream* data_stream, ObjectPool* obj_pool) diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index de01907650e..8dc4e54073a 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -86,7 +86,7 @@ public: const char* get_family_name() const override { return "Nullable"; } std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; } MutableColumnPtr clone_resized(size_t size) const override; - size_t size() const override { return nested_column->size(); } + size_t size() const override { return assert_cast<const ColumnUInt8&>(*null_map).size(); } bool is_null_at(size_t n) const override { return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index cf45d039522..463d04f218e 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -533,12 +533,14 @@ message PMergeFilterRequest { optional PInFilter in_filter = 7; optional bool is_pipeline = 8; optional bool opt_remote_rf = 9; + optional PColumnType column_type = 10; }; message PMergeFilterResponse { required PStatus status = 1; }; +// delete PPublishFilterRequest after upgrade doris 2.1 message PPublishFilterRequest { required int32 filter_id = 1; required PUniqueId query_id = 2; @@ -549,6 +551,7 @@ message PPublishFilterRequest { optional PInFilter in_filter = 7; optional bool is_pipeline = 8; optional int64 merge_time = 9; + optional PColumnType column_type = 10; }; message PPublishFilterRequestV2 { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org