This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba4767eef2 [improvement](scan) make global runtime filter support 
in-list filter (#29394)
4ba4767eef2 is described below

commit 4ba4767eef28eca4d1c64fbf45a9c2ebc594d2e1
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Thu Jan 4 09:12:30 2024 +0800

    [improvement](scan) make global runtime filter support in-list filter 
(#29394)
---
 be/src/exprs/runtime_filter.cpp                    | 45 ++++++++++++++--------
 be/src/exprs/runtime_filter.h                      |  4 +-
 be/src/exprs/runtime_filter_slots.h                |  5 +--
 .../processor/post/RuntimeFilterGenerator.java     |  6 +--
 4 files changed, 32 insertions(+), 28 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f52a9574bf6..bf09adc53f8 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -453,9 +453,11 @@ public:
                           std::vector<vectorized::VExprSPtr>& push_exprs, 
const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
-        bool can_not_merge_in_or_bloom = _filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                                         (wrapper->_filter_type != 
RuntimeFilterType::IN_FILTER &&
-                                          wrapper->_filter_type != 
RuntimeFilterType::BLOOM_FILTER);
+        bool can_not_merge_in_or_bloom =
+                _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+                (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
+                 wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+                 wrapper->_filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
 
         bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                                    _filter_type != wrapper->_filter_type;
@@ -513,8 +515,15 @@ public:
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
             auto real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
                                                     : 
RuntimeFilterType::IN_FILTER;
+
+            auto other_filter_type = wrapper->_filter_type;
+            if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+                other_filter_type = wrapper->_is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
+                                                             : 
RuntimeFilterType::IN_FILTER;
+            }
+
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
-                if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { 
// in merge in
+                if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in 
merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
@@ -526,7 +535,6 @@ public:
                                    << ") >= max_in_num(" << _max_in_num << ")";
                         change_to_bloom_filter();
                     }
-                    // in merge bloom filter
                 } else {
                     VLOG_DEBUG << " change runtime filter to bloom filter(id=" 
<< _filter_id
                                << ") because: already exist a bloom filter";
@@ -535,8 +543,7 @@ public:
                             wrapper->_context.bloom_filter_func.get()));
                 }
             } else {
-                if (wrapper->_filter_type ==
-                    RuntimeFilterType::IN_FILTER) { // bloom filter merge in
+                if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
bloom filter merge in
                     CHECK(!wrapper->_is_ignored_in_filter)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
@@ -1157,6 +1164,14 @@ void 
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
     _filter_timer.push_back(timer);
 }
 
+void IRuntimeFilter::set_ignored(const std::string& msg) {
+    _is_ignored = true;
+    if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
+        _wrapper->_is_ignored_in_filter = true;
+        _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg));
+    }
+}
+
 BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
     return _wrapper->get_bloomfilter();
 }
@@ -1344,14 +1359,14 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
     if (!_is_ignored && wrapper->is_ignored_in_filter()) {
-        set_ignored();
-        set_ignored_msg(*(wrapper->get_ignored_in_filter_msg()));
+        std::string* msg = wrapper->get_ignored_in_filter_msg();
+        set_ignored(msg ? *msg : "");
     }
     auto origin_type = _wrapper->get_real_type();
     Status status = _wrapper->merge(wrapper);
     if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
-        set_ignored();
-        set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg()));
+        std::string* msg = _wrapper->get_ignored_in_filter_msg();
+        set_ignored(msg ? *msg : "");
     }
     if (origin_type != _wrapper->get_real_type()) {
         update_runtime_filter_type_to_profile();
@@ -1656,10 +1671,8 @@ bool IRuntimeFilter::is_bloomfilter() {
 
 Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
-        set_ignored();
         const PInFilter in_filter = param->request->in_filter();
-        auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
-        set_ignored_msg(*msg);
+        set_ignored(in_filter.ignored_msg());
     }
     std::unique_ptr<RuntimePredicateWrapper> wrapper;
     RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, 
&wrapper));
@@ -1677,10 +1690,8 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
 Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
                                      int64_t start_apply) {
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
-        set_ignored();
         const PInFilter in_filter = param->request->in_filter();
-        auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
-        set_ignored_msg(*msg);
+        set_ignored(in_filter.ignored_msg());
     }
 
     std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 2673308ae77..fc324c1c1be 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -292,9 +292,7 @@ public:
     Status update_filter(const UpdateRuntimeFilterParams* param);
     Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t 
start_apply);
 
-    void set_ignored() { _is_ignored = true; }
-
-    void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
+    void set_ignored(const std::string& msg);
 
     // for ut
     bool is_bloomfilter();
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index d539e295ae8..495ac28e762 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -58,15 +58,14 @@ public:
                                 filter_id);
             }
             for (auto filter : filters) {
-                filter->set_ignored();
+                filter->set_ignored("");
                 filter->signal();
             }
             return Status::OK();
         };
 
         auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored();
-            runtime_filter->set_ignored_msg(msg);
+            runtime_filter->set_ignored(msg);
             RETURN_IF_ERROR(runtime_filter->publish());
             return Status::OK();
         };
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 45a14d3b633..12db27793d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -369,11 +369,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
                 .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
                 .collect(Collectors.toList());
-        if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
-            // If storage data distribution is ignored, we use BLOOM filter.
-            legalTypes.clear();
-            legalTypes.add(TRuntimeFilterType.BLOOM);
-        }
+
         List<EqualTo> hashJoinConjuncts = join.getEqualToConjuncts();
         for (int i = 0; i < hashJoinConjuncts.size(); i++) {
             EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to