This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 45645333b4d [Refactor](rf) Refactor the rf code interface to remove 
update filter v1 (#31643)
45645333b4d is described below

commit 45645333b4d33d84d1aee92b1dfa77f0bff35216
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Sat Mar 2 09:15:32 2024 +0800

    [Refactor](rf) Refactor the rf code interface to remove update filter v1 
(#31643)
---
 be/src/exprs/bitmapfilter_predicate.h |  2 +-
 be/src/exprs/bloom_filter_func.h      | 65 ++++++++++++++++++++++++-----------
 be/src/exprs/hybrid_set.h             | 16 +++++----
 be/src/exprs/minmax_predicate.h       | 10 +++---
 be/src/exprs/runtime_filter.cpp       |  5 +++
 be/src/exprs/runtime_filter.h         |  4 ++-
 be/src/vec/columns/column_nullable.h  |  2 +-
 gensrc/proto/internal_service.proto   |  3 ++
 8 files changed, 72 insertions(+), 35 deletions(-)

diff --git a/be/src/exprs/bitmapfilter_predicate.h 
b/be/src/exprs/bitmapfilter_predicate.h
index 8df488cf875..376453c0681 100644
--- a/be/src/exprs/bitmapfilter_predicate.h
+++ b/be/src/exprs/bitmapfilter_predicate.h
@@ -28,7 +28,7 @@
 namespace doris {
 
 // only used in Runtime Filter
-class BitmapFilterFuncBase : public FilterFuncBase {
+class BitmapFilterFuncBase : public RuntimeFilterFuncBase {
 public:
     virtual void insert(const void* data) = 0;
     virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 
0;
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 84e6eba1e44..ce1ceb6f8f7 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -27,7 +27,10 @@ namespace doris {
 
 class BloomFilterAdaptor {
 public:
-    BloomFilterAdaptor() { _bloom_filter = 
std::make_shared<doris::BlockBloomFilter>(); }
+    BloomFilterAdaptor(bool null_aware = false) : _null_aware(null_aware) {
+        _bloom_filter = std::make_shared<doris::BlockBloomFilter>();
+    }
+
     static int64_t optimal_bit_num(int64_t expect_num, double fpp) {
         return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, 
fpp) / 8;
     }
@@ -74,12 +77,18 @@ public:
         }
     }
 
+    void set_contain_null() { _contain_null = true; }
+
+    bool contain_null() const { return _null_aware && _contain_null; }
+
 private:
+    bool _null_aware = false;
+    bool _contain_null = false;
     std::shared_ptr<doris::BlockBloomFilter> _bloom_filter;
 };
 
 // Only Used In RuntimeFilter
-class BloomFilterFuncBase : public FilterFuncBase {
+class BloomFilterFuncBase : public RuntimeFilterFuncBase {
 public:
     virtual ~BloomFilterFuncBase() = default;
 
@@ -236,10 +245,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor& 
bloom_filter, const char* dat
             for (int i = 0; i < number; i++) {
                 uint16_t idx = offsets[i];
                 if (nullmap[idx]) {
-                    continue;
-                }
-                if (!bloom_filter.test_element(get_element(data, idx))) {
-                    continue;
+                    if (!bloom_filter.contain_null()) {
+                        continue;
+                    }
+                } else {
+                    if (!bloom_filter.test_element(get_element(data, idx))) {
+                        continue;
+                    }
                 }
                 offsets[new_size++] = idx;
             }
@@ -255,10 +267,13 @@ uint16_t find_batch_olap(const BloomFilterAdaptor& 
bloom_filter, const char* dat
         } else {
             for (int i = 0; i < number; i++) {
                 if (nullmap[i]) {
-                    continue;
-                }
-                if (!bloom_filter.test_element(get_element(data, i))) {
-                    continue;
+                    if (!bloom_filter.contain_null()) {
+                        continue;
+                    }
+                } else {
+                    if (!bloom_filter.test_element(get_element(data, i))) {
+                        continue;
+                    }
                 }
                 offsets[new_size++] = i;
             }
@@ -277,6 +292,7 @@ struct CommonFindOp {
 
     void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
                       size_t start) const {
+        const auto size = column->size();
         if (column->is_nullable()) {
             const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
             const auto& col = nullable->get_nested_column();
@@ -285,14 +301,16 @@ struct CommonFindOp {
                             .get_data();
 
             const T* data = (T*)col.get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if (!nullmap[i]) {
                     bloom_filter.add_element(*(data + i));
+                } else {
+                    bloom_filter.set_contain_null();
                 }
             }
         } else {
             const T* data = (T*)column->get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 bloom_filter.add_element(*(data + i));
             }
         }
@@ -315,16 +333,17 @@ struct CommonFindOp {
             data = (T*)column->get_raw_data().data;
         }
 
+        const auto size = column->size();
         if (nullmap) {
-            for (size_t i = 0; i < column->size(); i++) {
+            for (size_t i = 0; i < size; i++) {
                 if (!nullmap[i]) {
                     results[i] = bloom_filter.test_element(data[i]);
                 } else {
-                    results[i] = false;
+                    results[i] = bloom_filter.contain_null();
                 }
             }
         } else {
-            for (size_t i = 0; i < column->size(); i++) {
+            for (size_t i = 0; i < size; i++) {
                 results[i] = bloom_filter.test_element(data[i]);
             }
         }
@@ -346,14 +365,16 @@ struct StringFindOp : CommonFindOp<StringRef> {
                     assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
                             .get_data();
 
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < col.size(); i++) {
                 if (!nullmap[i]) {
                     bloom_filter.add_element(col.get_data_at(i));
+                } else {
+                    bloom_filter.set_contain_null();
                 }
             }
         } else {
             const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < col->size(); i++) {
                 bloom_filter.add_element(col->get_data_at(i));
             }
         }
@@ -368,22 +389,23 @@ struct StringFindOp : CommonFindOp<StringRef> {
             const auto& nullmap =
                     assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
                             .get_data();
+
             if (nullable->has_null()) {
-                for (size_t i = 0; i < column->size(); i++) {
+                for (size_t i = 0; i < col.size(); i++) {
                     if (!nullmap[i]) {
                         results[i] = 
bloom_filter.test_element(col.get_data_at(i));
                     } else {
-                        results[i] = false;
+                        results[i] = bloom_filter.contain_null();
                     }
                 }
             } else {
-                for (size_t i = 0; i < column->size(); i++) {
+                for (size_t i = 0; i < col.size(); i++) {
                     results[i] = bloom_filter.test_element(col.get_data_at(i));
                 }
             }
         } else {
             const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
-            for (size_t i = 0; i < column->size(); i++) {
+            for (size_t i = 0; i < col->size(); i++) {
                 results[i] = bloom_filter.test_element(col->get_data_at(i));
             }
         }
@@ -451,6 +473,7 @@ public:
             uint16_t idx = offsets[i];
             offsets[new_size] = idx;
             if constexpr (is_nullable) {
+                new_size += nullmap[idx] && _bloom_filter->contain_null();
                 new_size += !nullmap[idx] && 
_bloom_filter->test(column->get_hash_value(idx));
             } else {
                 new_size += _bloom_filter->test(column->get_hash_value(idx));
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 9151dc7d3bd..96e0c3f879a 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -175,7 +175,7 @@ private:
 };
 
 // TODO Maybe change void* parameter to template parameter better.
-class HybridSetBase : public FilterFuncBase {
+class HybridSetBase : public RuntimeFilterFuncBase {
 public:
     HybridSetBase() = default;
     virtual ~HybridSetBase() = default;
@@ -275,6 +275,8 @@ public:
     void insert(void* data, size_t /*unused*/) override { insert(data); }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) 
override {
+        const auto size = column->size();
+
         if (column->is_nullable()) {
             const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
             const auto& col = nullable->get_nested_column();
@@ -283,14 +285,14 @@ public:
                             .get_data();
 
             const ElementType* data = (ElementType*)col.get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if (!nullmap[i]) {
                     _set.insert(*(data + i));
                 }
             }
         } else {
             const ElementType* data = 
(ElementType*)column->get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 _set.insert(*(data + i));
             }
         }
@@ -412,14 +414,14 @@ public:
                     assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
                             .get_data();
 
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < nullable->size(); i++) {
                 if (!nullmap[i]) {
                     _set.insert(col.get_data_at(i).to_string());
                 }
             }
         } else {
             const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < col->size(); i++) {
                 _set.insert(col->get_data_at(i).to_string());
             }
         }
@@ -554,14 +556,14 @@ public:
                     assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
                             .get_data();
 
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < nullable->size(); i++) {
                 if (!nullmap[i]) {
                     _set.insert(col.get_data_at(i));
                 }
             }
         } else {
             const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < col->size(); i++) {
                 _set.insert(col->get_data_at(i));
             }
         }
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index b9ee56a8dc1..297530dbd84 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -65,9 +65,10 @@ public:
     }
 
     void update_batch(const vectorized::ColumnPtr& column, size_t start) {
+        const auto size = column->size();
         if constexpr (std::is_same_v<T, StringRef>) {
             const auto& column_string = assert_cast<const 
vectorized::ColumnString&>(*column);
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if constexpr (NeedMin) {
                     _min = std::min(_min, column_string.get_data_at(i));
                 }
@@ -77,7 +78,7 @@ public:
             }
         } else {
             const T* data = (T*)column->get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if constexpr (NeedMin) {
                     _min = std::min(_min, *(data + i));
                 }
@@ -90,9 +91,10 @@ public:
 
     void update_batch(const vectorized::ColumnPtr& column, const 
vectorized::NullMap& nullmap,
                       size_t start) {
+        const auto size = column->size();
         if constexpr (std::is_same_v<T, StringRef>) {
             const auto& column_string = assert_cast<const 
vectorized::ColumnString&>(*column);
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if (!nullmap[i]) {
                     if constexpr (NeedMin) {
                         _min = std::min(_min, column_string.get_data_at(i));
@@ -104,7 +106,7 @@ public:
             }
         } else {
             const T* data = (T*)column->get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
+            for (size_t i = start; i < size; i++) {
                 if (!nullmap[i]) {
                     if constexpr (NeedMin) {
                         _min = std::min(_min, *(data + i));
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 24c41613be4..84c82d89b38 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1026,6 +1026,8 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     merge_filter_request->set_filter_id(_filter_id);
     merge_filter_request->set_opt_remote_rf(opt_remote_rf);
     merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
+    auto column_type = _wrapper->column_type();
+    merge_filter_request->set_column_type(to_proto(column_type));
     merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
 
     Status serialize_status = serialize(merge_filter_request.get(), &data, 
&len);
@@ -1325,6 +1327,9 @@ Status IRuntimeFilter::_create_wrapper(const T* param, 
ObjectPool* pool,
     if (param->request->has_in_filter()) {
         column_type = 
to_primitive_type(param->request->in_filter().column_type());
     }
+    if (param->request->has_column_type()) {
+        column_type = to_primitive_type(param->request->column_type());
+    }
     wrapper->reset(new RuntimePredicateWrapper(pool, column_type, 
get_type(filter_type),
                                                param->request->filter_id()));
     switch (filter_type) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 91456cccced..c83758f38ba 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -132,7 +132,8 @@ struct RuntimeFilterParams {
     bool bitmap_filter_not_in;
     bool build_bf_exactly;
 };
-struct FilterFuncBase {
+
+struct RuntimeFilterFuncBase {
 public:
     void set_filter_id(int filter_id) {
         if (_filter_id == -1) {
@@ -147,6 +148,7 @@ public:
 private:
     int _filter_id = -1;
 };
+
 struct UpdateRuntimeFilterParams {
     UpdateRuntimeFilterParams(const PPublishFilterRequest* req,
                               butil::IOBufAsZeroCopyInputStream* data_stream, 
ObjectPool* obj_pool)
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index de01907650e..8dc4e54073a 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -86,7 +86,7 @@ public:
     const char* get_family_name() const override { return "Nullable"; }
     std::string get_name() const override { return "Nullable(" + 
nested_column->get_name() + ")"; }
     MutableColumnPtr clone_resized(size_t size) const override;
-    size_t size() const override { return nested_column->size(); }
+    size_t size() const override { return assert_cast<const 
ColumnUInt8&>(*null_map).size(); }
     bool is_null_at(size_t n) const override {
         return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
     }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index cf45d039522..463d04f218e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -533,12 +533,14 @@ message PMergeFilterRequest {
     optional PInFilter in_filter = 7;
     optional bool is_pipeline = 8;
     optional bool opt_remote_rf = 9;
+    optional PColumnType column_type = 10;
 };
 
 message PMergeFilterResponse {
     required PStatus status = 1;
 };
 
+// delete PPublishFilterRequest after upgrade doris 2.1
 message PPublishFilterRequest {
     required int32 filter_id = 1;
     required PUniqueId query_id = 2;
@@ -549,6 +551,7 @@ message PPublishFilterRequest {
     optional PInFilter in_filter = 7;
     optional bool is_pipeline = 8;
     optional int64 merge_time = 9;
+    optional PColumnType column_type = 10;
 };
 
 message PPublishFilterRequestV2 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to