This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9b3fb4a4c73 [Bug](runtime-filter) fix wrong build_bf_exactly when sync 
filter siz… #44716 (#46965)
9b3fb4a4c73 is described below

commit 9b3fb4a4c73d9dcf152f47930d439eae4953b660
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Jan 14 17:07:40 2025 +0800

    [Bug](runtime-filter) fix wrong build_bf_exactly when sync filter siz… 
#44716 (#46965)
    
    pick from #44716
---
 be/src/exprs/runtime_filter.cpp                    | 20 +++----------
 be/src/exprs/runtime_filter.h                      |  5 ++--
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  4 +--
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 be/src/runtime/runtime_filter_mgr.cpp              | 22 +++++++-------
 be/src/runtime/runtime_filter_mgr.h                |  8 ++---
 be/src/runtime/runtime_state.cpp                   | 13 ++++----
 be/src/runtime/runtime_state.h                     |  3 +-
 .../glue/translator/RuntimeFilterTranslator.java   |  1 +
 .../org/apache/doris/planner/RuntimeFilter.java    | 35 ++++++++++++++++++++--
 gensrc/thrift/PlanNodes.thrift                     |  4 ++-
 11 files changed, 66 insertions(+), 51 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 74ac7bd18be..e98243255bd 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -992,11 +992,10 @@ private:
 
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                               const TQueryOptions* query_options, const 
RuntimeFilterRole role,
-                              int node_id, std::shared_ptr<IRuntimeFilter>* 
res,
-                              bool build_bf_exactly) {
+                              int node_id, std::shared_ptr<IRuntimeFilter>* 
res) {
     *res = std::make_shared<IRuntimeFilter>(state, desc);
     (*res)->set_role(role);
-    return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
+    return (*res)->init_with_desc(desc, query_options, node_id);
 }
 
 RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
@@ -1368,7 +1367,7 @@ std::string IRuntimeFilter::formatted_state() const {
 }
 
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options,
-                                      int node_id, bool build_bf_exactly) {
+                                      int node_id) {
     // if node_id == -1 , it shouldn't be a consumer
     DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
 
@@ -1390,21 +1389,10 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     params.runtime_bloom_filter_max_size = 
options->__isset.runtime_bloom_filter_max_size
                                                    ? 
options->runtime_bloom_filter_max_size
                                                    : 0;
-    auto sync_filter_size = desc->__isset.sync_filter_size && 
desc->sync_filter_size;
-    // We build runtime filter by exact distinct count iff three conditions 
are met:
-    // 1. Only 1 join key
-    // 2. Bloom filter
-    // 3. Size of all bloom filters will be same (size will be sync or this is 
a broadcast join).
-    params.build_bf_exactly =
-            build_bf_exactly && (_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER ||
-                                 _runtime_filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER);
 
+    params.build_bf_exactly = desc->__isset.build_bf_exactly && 
desc->build_bf_exactly;
     params.bloom_filter_size_calculated_by_ndv = 
desc->bloom_filter_size_calculated_by_ndv;
 
-    if (!sync_filter_size) {
-        params.build_bf_exactly &= !_is_broadcast_join;
-    }
-
     if (desc->__isset.bloom_filter_size_bytes) {
         params.bloom_filter_size = desc->bloom_filter_size_bytes;
     }
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index d0bc9be4145..3a3c4a6c416 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -212,8 +212,7 @@ public:
 
     static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                          const TQueryOptions* query_options, const 
RuntimeFilterRole role,
-                         int node_id, std::shared_ptr<IRuntimeFilter>* res,
-                         bool build_bf_exactly = false);
+                         int node_id, std::shared_ptr<IRuntimeFilter>* res);
 
     RuntimeFilterContextSPtr& get_shared_context_ref();
 
@@ -259,7 +258,7 @@ public:
 
     // init filter with desc
     Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* 
options,
-                          int node_id = -1, bool build_bf_exactly = false);
+                          int node_id = -1);
 
     // serialize _wrapper to protobuf
     Status serialize(PMergeFilterRequest* request, void** data, int* len);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3f096450204..bcad867495f 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -92,8 +92,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _hash_table_init(state);
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        RETURN_IF_ERROR(state->register_producer_runtime_filter(
-                p._runtime_filter_descs[i], &_runtime_filters[i], 
_build_expr_ctxs.size() == 1));
+        
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
+                                                                
&_runtime_filters[i]));
     }
 
     _runtime_filter_slots =
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index c9f7ee7cf5e..7b531fcd2d5 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
         
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
-                                                                
&_runtime_filters[i], false));
+                                                                
&_runtime_filters[i]));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index f7e22cde144..7e9a0146b21 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -90,7 +90,7 @@ std::vector<std::shared_ptr<IRuntimeFilter>> 
RuntimeFilterMgr::get_consume_filte
 Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& 
options, int node_id,
                                                   
std::shared_ptr<IRuntimeFilter>* consumer_filter,
-                                                  bool build_bf_exactly, bool 
need_local_merge) {
+                                                  bool need_local_merge) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
     bool has_exist = false;
@@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
     if (!has_exist) {
         std::shared_ptr<IRuntimeFilter> filter;
         RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, 
RuntimeFilterRole::CONSUMER,
-                                               node_id, &filter, 
build_bf_exactly));
+                                               node_id, &filter));
         _consumer_map[key].emplace_back(node_id, filter);
         *consumer_filter = filter;
     } else if (!need_local_merge) {
@@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
 
 Status RuntimeFilterMgr::register_local_merge_producer_filter(
         const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& 
options,
-        std::shared_ptr<IRuntimeFilter> producer_filter, bool 
build_bf_exactly) {
+        std::shared_ptr<IRuntimeFilter> producer_filter) {
     DCHECK(_is_global);
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
@@ -143,8 +143,7 @@ Status 
RuntimeFilterMgr::register_local_merge_producer_filter(
         if (iter->second.filters.empty()) {
             std::shared_ptr<IRuntimeFilter> merge_filter;
             RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
-                                                   
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
-                                                   build_bf_exactly));
+                                                   
RuntimeFilterRole::PRODUCER, -1, &merge_filter));
             merge_filter->set_ignored();
             iter->second.filters.emplace_back(merge_filter);
         }
@@ -181,10 +180,9 @@ doris::LocalMergeFilters* 
RuntimeFilterMgr::get_local_merge_producer_filters(int
     return &iter->second;
 }
 
-Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& 
desc,
-                                                  const TQueryOptions& options,
-                                                  
std::shared_ptr<IRuntimeFilter>* producer_filter,
-                                                  bool build_bf_exactly) {
+Status RuntimeFilterMgr::register_producer_filter(
+        const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+        std::shared_ptr<IRuntimeFilter>* producer_filter) {
     DCHECK(!_is_global);
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
@@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
         return Status::InvalidArgument("filter has registed");
     }
     RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, 
RuntimeFilterRole::PRODUCER, -1,
-                                           producer_filter, build_bf_exactly));
+                                           producer_filter));
     _producer_map.emplace(key, *producer_filter);
     return Status::OK();
 }
@@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, 
runtime_filter_desc));
 
     auto filter_id = runtime_filter_desc->filter_id;
-    
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options,
-                                                    -1, false));
+    RETURN_IF_ERROR(
+            cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options, -1));
     cnt_val->filter->set_ignored();
     _filter_map.emplace(filter_id, cnt_val);
     return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 0a6f8318fea..9f4cf5f4e22 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -100,19 +100,17 @@ public:
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     int node_id, 
std::shared_ptr<IRuntimeFilter>* consumer_filter,
-                                    bool build_bf_exactly = false, bool 
need_local_merge = false);
+                                    bool need_local_merge = false);
 
     Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
                                                 const TQueryOptions& options,
-                                                
std::shared_ptr<IRuntimeFilter> producer_filter,
-                                                bool build_bf_exactly = false);
+                                                
std::shared_ptr<IRuntimeFilter> producer_filter);
 
     Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** 
local_merge_filters);
     LocalMergeFilters* get_local_merge_producer_filters(int filter_id);
 
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    std::shared_ptr<IRuntimeFilter>* 
producer_filter,
-                                    bool build_bf_exactly = false);
+                                    std::shared_ptr<IRuntimeFilter>* 
producer_filter);
 
     // update filter by remote
     void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a22ad18ce04..80b018a4a19 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -522,14 +522,13 @@ RuntimeFilterMgr* 
RuntimeState::global_runtime_filter_mgr() {
 }
 
 Status RuntimeState::register_producer_runtime_filter(
-        const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* 
producer_filter,
-        bool build_bf_exactly) {
+        const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* 
producer_filter) {
     // Producers are created by local runtime filter mgr and shared by global 
runtime filter manager.
     // When RF is published, consumers in both global and local RF mgr will be 
found.
-    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
-            desc, query_options(), producer_filter, build_bf_exactly));
+    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, 
query_options(),
+                                                                         
producer_filter));
     
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
-            desc, query_options(), *producer_filter, build_bf_exactly));
+            desc, query_options(), *producer_filter));
     return Status::OK();
 }
 
@@ -538,10 +537,10 @@ Status RuntimeState::register_consumer_runtime_filter(
         std::shared_ptr<IRuntimeFilter>* consumer_filter) {
     if (desc.has_remote_targets || need_local_merge) {
         return global_runtime_filter_mgr()->register_consumer_filter(desc, 
query_options(), node_id,
-                                                                     
consumer_filter, false, true);
+                                                                     
consumer_filter, true);
     } else {
         return local_runtime_filter_mgr()->register_consumer_filter(desc, 
query_options(), node_id,
-                                                                    
consumer_filter, false, false);
+                                                                    
consumer_filter, false);
     }
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cd9de143522..e86990ae92b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,8 +561,7 @@ public:
     }
 
     Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
-                                            std::shared_ptr<IRuntimeFilter>* 
producer_filter,
-                                            bool build_bf_exactly);
+                                            std::shared_ptr<IRuntimeFilter>* 
producer_filter);
 
     Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
                                             bool need_local_merge, int node_id,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 3dbd6cfcec7..07e0af60173 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -143,6 +143,7 @@ public class RuntimeFilterTranslator {
                     targetTupleIdMapList, context.getLimits());
             if (node instanceof HashJoinNode) {
                 origFilter.setIsBroadcast(((HashJoinNode) 
node).getDistributionMode() == DistributionMode.BROADCAST);
+                origFilter.setSingleEq(((HashJoinNode) 
node).getEqJoinConjuncts().size());
             } else {
                 // nest loop join
                 origFilter.setIsBroadcast(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 2f3948aee16..80497798083 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -112,6 +112,8 @@ public final class RuntimeFilter {
 
     private boolean bloomFilterSizeCalculatedByNdv = false;
 
+    private boolean singleEq = false;
+
     /**
      * Internal representation of a runtime filter target.
      */
@@ -216,9 +218,36 @@ public final class RuntimeFilter {
         tFilter.setIsBroadcastJoin(isBroadcastJoin);
         tFilter.setHasLocalTargets(hasLocalTargets);
         tFilter.setHasRemoteTargets(hasRemoteTargets);
+
+        boolean hasSerialTargets = false;
         for (RuntimeFilterTarget target : targets) {
             tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), 
target.expr.treeToThrift());
+            hasSerialTargets = hasSerialTargets
+                    || (target.node.isSerialOperator() && 
target.node.fragment.useSerialSource(ConnectContext.get()));
         }
+
+        boolean enableSyncFilterSize = ConnectContext.get() != null
+                && 
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
+
+        // there are two cases has local exchange between join and scan
+        // 1. hasRemoteTargets is true means join probe side do least once 
shuffle (has shuffle between join and scan)
+        // 2. hasSerialTargets is true means scan is pooled (has local shuffle 
between join and scan)
+        boolean needShuffle = hasRemoteTargets || hasSerialTargets;
+
+        // There are two cases where all instances of rf have the same size.
+        // 1. enableSyncFilterSize is true means backends will collect global 
size and send to every instance
+        // 2. isBroadcastJoin is true means each join node instance have the 
same full amount of data
+        boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin;
+
+        // build runtime filter by exact distinct count if all of 3 conditions 
are met:
+        // 1. only single eq conjunct
+        // 2. rf type may be bf
+        // 3. each filter only acts on self instance(do not need any shuffle), 
or size of
+        // all filters will be same
+        boolean buildBfExactly = singleEq && (runtimeFilterType == 
TRuntimeFilterType.IN_OR_BLOOM
+                || runtimeFilterType == TRuntimeFilterType.BLOOM) && 
(!needShuffle || hasGlobalSize);
+        tFilter.setBuildBfExactly(buildBfExactly);
+
         tFilter.setType(runtimeFilterType);
         tFilter.setBloomFilterSizeBytes(filterSizeBytes);
         if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) {
@@ -239,8 +268,6 @@ public final class RuntimeFilter {
                 tFilter.setNullAware(false);
             }
         }
-        tFilter.setSyncFilterSize(ConnectContext.get() != null
-                && 
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize());
         return tFilter;
     }
 
@@ -597,6 +624,10 @@ public final class RuntimeFilter {
         targets.add(target);
     }
 
+    public void setSingleEq(int eqJoinConjunctsNumbers) {
+        singleEq = (eqJoinConjunctsNumbers == 1);
+    }
+
     public void setIsBroadcast(boolean isBroadcast) {
         isBroadcastJoin = isBroadcast;
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9aaa7076901..70c6722b9d8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1301,7 +1301,9 @@ struct TRuntimeFilterDesc {
   // true, if join type is null aware like <=>. rf should dispose the case
   15: optional bool null_aware;
 
-  16: optional bool sync_filter_size;
+  16: optional bool sync_filter_size; // Deprecated
+  
+  17: optional bool build_bf_exactly;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to