This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c8a793ad6ae [Exec](RF) Support merge remote rf local first (#31067)
c8a793ad6ae is described below

commit c8a793ad6ae68c6ba0584f3ed216f9f6716f7ee7
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Feb 21 22:38:33 2024 +0800

    [Exec](RF) Support merge remote rf local first (#31067)
---
 be/src/agent/be_exec_version_manager.h             |  1 +
 be/src/exprs/runtime_filter.cpp                    | 77 +++++++++---------
 be/src/exprs/runtime_filter.h                      | 24 ++----
 be/src/exprs/runtime_filter_slots.h                | 11 ++-
 be/src/pipeline/exec/datagen_operator.cpp          |  9 +--
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 23 +++---
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +-
 .../exec/nested_loop_join_build_operator.cpp       |  4 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  5 +-
 be/src/pipeline/pipeline_fragment_context.h        |  2 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 14 ++--
 .../pipeline_x/pipeline_x_fragment_context.h       |  2 +-
 be/src/runtime/fragment_mgr.cpp                    |  5 +-
 be/src/runtime/query_context.h                     |  3 +
 be/src/runtime/runtime_filter_mgr.cpp              | 91 ++++++++++++++++++----
 be/src/runtime/runtime_filter_mgr.h                | 32 ++++----
 be/src/runtime/runtime_state.cpp                   | 36 +++++++--
 be/src/runtime/runtime_state.h                     | 22 ++++--
 be/src/vec/exec/join/vhash_join_node.cpp           |  6 +-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |  4 +-
 be/src/vec/exec/runtime_filter_consumer.cpp        | 23 ++----
 be/src/vec/exec/runtime_filter_consumer.h          |  4 +-
 be/src/vec/exec/vdata_gen_scan_node.cpp            |  9 +--
 .../main/java/org/apache/doris/qe/Coordinator.java |  3 +-
 24 files changed, 243 insertions(+), 169 deletions(-)

diff --git a/be/src/agent/be_exec_version_manager.h 
b/be/src/agent/be_exec_version_manager.h
index f5213c54089..afe738684aa 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -65,6 +65,7 @@ private:
  *    d. unix_timestamp function support timestamp with float for datetimev2, 
and change nullable mode.
  *    e. change shuffle serialize/deserialize way 
  *    f. shrink some function's nullable mode.
+ *    g. do local merge of remote runtime filter
 */
 constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
 constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2ac9ad40c5a..1ec66bf2a87 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -938,11 +938,11 @@ private:
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
                               const TRuntimeFilterDesc* desc, const 
TQueryOptions* query_options,
                               const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
-                              bool build_bf_exactly, bool is_global, int 
parallel_tasks) {
-    *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, 
parallel_tasks));
+                              bool build_bf_exactly, bool need_local_merge) {
+    *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge));
     (*res)->set_role(role);
     return (*res)->init_with_desc(desc, query_options, node_id,
-                                  is_global ? false : build_bf_exactly);
+                                  need_local_merge ? false : build_bf_exactly);
 }
 
 vectorized::SharedRuntimeFilterContext& 
IRuntimeFilter::get_shared_context_ref() {
@@ -954,47 +954,53 @@ void IRuntimeFilter::insert_batch(const 
vectorized::ColumnPtr column, size_t sta
     _wrapper->insert_batch(column, start);
 }
 
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, 
int* merged_num) {
-    SCOPED_TIMER(_merge_local_rf_timer);
-    std::unique_lock lock(_local_merge_mutex);
-    if (_merged_rf_num == 0) {
-        _wrapper = wrapper;
-    } else {
-        RETURN_IF_ERROR(merge_from(wrapper));
-    }
-    *merged_num = ++_merged_rf_num;
-    return Status::OK();
-}
-
 Status IRuntimeFilter::publish(bool publish_local) {
     DCHECK(is_producer());
-    if (_is_global && _has_local_target) {
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters(
-                _filter_id, filters));
-        // push down
-        for (auto filter : filters) {
-            int merged_num = 0;
-            RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num));
-            if (merged_num == _parallel_build_tasks) {
-                filter->update_runtime_filter_type_to_profile();
-                filter->signal();
-            }
-        }
-    } else if (_has_local_target) {
+    auto send_to_remote = [&](IRuntimeFilter* filter) {
+        TNetworkAddress addr;
+        DCHECK(_state != nullptr);
+        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
+        return filter->push_to_remote(&addr, _opt_remote_rf);
+    };
+    auto send_to_local = [&](RuntimePredicateWrapper* wrapper) {
         std::vector<IRuntimeFilter*> filters;
         
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, 
filters));
+        DCHECK(!filters.empty());
         // push down
         for (auto filter : filters) {
-            filter->_wrapper = _wrapper;
+            filter->_wrapper = wrapper;
             filter->update_runtime_filter_type_to_profile();
             filter->signal();
         }
+        return Status::OK();
+    };
+    auto do_local_merge = [&]() {
+        LocalMergeFilters* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
+                _filter_id, &local_merge_filters));
+        std::lock_guard l(*local_merge_filters->lock);
+        RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
+        local_merge_filters->merge_time--;
+        if (local_merge_filters->merge_time == 0) {
+            if (_has_local_target) {
+                
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
+            } else {
+                
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
+            }
+        }
+        return Status::OK();
+    };
+
+    if (_need_local_merge && _has_local_target) {
+        RETURN_IF_ERROR(do_local_merge());
+    } else if (_has_local_target) {
+        RETURN_IF_ERROR(send_to_local(_wrapper));
     } else if (!publish_local) {
-        TNetworkAddress addr;
-        DCHECK(_state != nullptr);
-        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
-        return push_to_remote(&addr, _opt_remote_rf);
+        if (_is_broadcast_join || _state->be_exec_version < 3) {
+            RETURN_IF_ERROR(send_to_remote(this));
+        } else {
+            RETURN_IF_ERROR(do_local_merge());
+        }
     } else {
         // remote broadcast join only push onetime in build shared hash table
         // publish_local only set true on copy shared hash table
@@ -1366,9 +1372,6 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
     _profile_init = true;
     parent_profile->add_child(_profile.get(), true, nullptr);
     _profile->add_info_string("Info", _format_status());
-    if (_is_global) {
-        _merge_local_rf_timer = ADD_TIMER(_profile.get(), 
"MergeLocalRuntimeFilterTime");
-    }
     if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
         update_runtime_filter_type_to_profile();
     }
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ae8063e5610..d853493889c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -185,7 +185,7 @@ enum RuntimeFilterState {
 class IRuntimeFilter {
 public:
     IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
-                   const TRuntimeFilterDesc* desc, bool is_global = false, int 
parallel_tasks = -1)
+                   const TRuntimeFilterDesc* desc, bool need_local_merge = 
false)
             : _state(state),
               _pool(pool),
               _filter_id(desc->filter_id),
@@ -204,16 +204,14 @@ public:
               _name(fmt::format("RuntimeFilter: (id = {}, type = {})", 
_filter_id,
                                 to_string(_runtime_filter_type))),
               _profile(new RuntimeProfile(_name)),
-              _is_global(is_global),
-              _parallel_build_tasks(parallel_tasks) {}
+              _need_local_merge(need_local_merge) {}
 
     ~IRuntimeFilter() = default;
 
     static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
                          const TRuntimeFilterDesc* desc, const TQueryOptions* 
query_options,
                          const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
-                         bool build_bf_exactly = false, bool is_global = false,
-                         int parallel_tasks = 0);
+                         bool build_bf_exactly = false, bool need_local_merge 
= false);
 
     vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
 
@@ -349,8 +347,6 @@ public:
 
     void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
 
-    Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* 
merged_num);
-
 protected:
     // serialize _wrapper to protobuf
     void to_protobuf(PInFilter* filter);
@@ -426,18 +422,10 @@ protected:
     // parent profile
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
-    RuntimeProfile::Counter* _merge_local_rf_timer = nullptr;
     bool _opt_remote_rf;
-    // `_is_global` indicates whether this runtime filter is global on this BE.
-    // All runtime filters should be merged on each BE if it is global.
-    // This is improvement for pipelineX.
-    const bool _is_global = false;
-    std::mutex _local_merge_mutex;
-    // There are `_parallel_build_tasks` pipeline tasks to build runtime 
filter.
-    // We should call `signal` once all runtime filters are done and merged to 
one
-    // (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`).
-    int _merged_rf_num = 0;
-    const int _parallel_build_tasks = -1;
+    // `_need_local_merge` indicates whether this runtime filter is global on 
this BE.
+    // All runtime filters should be merged on each BE before push_to_remote 
or publish.
+    const bool _need_local_merge = false;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 };
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 7a738b8c06d..7f34bf7f2c9 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -34,25 +34,24 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
is_global = false)
+            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
             : _build_expr_context(build_expr_ctxs),
               _runtime_filters(runtime_filters),
-              _is_global(is_global) {}
+              _need_local_merge(need_local_merge) {}
 
     Status init(RuntimeState* state, int64_t hash_table_size) {
         // runtime filter effect strategy
         // 1. we will ignore IN filter when hash_table_size is too big
         // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
         // is too small and IN filter has effect
-
         std::map<int, bool> has_in_filter;
 
         auto ignore_local_filter = [&](int filter_id) {
             // Now pipeline x have bug in ignore, after fix the problem enable 
ignore logic in pipeline x
-            if (_is_global) {
+            if (_need_local_merge) {
                 return Status::OK();
             }
-            auto runtime_filter_mgr = state->runtime_filter_mgr();
+            auto runtime_filter_mgr = state->local_runtime_filter_mgr();
 
             std::vector<IRuntimeFilter*> filters;
             RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
@@ -217,7 +216,7 @@ public:
 private:
     const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
_build_expr_context;
     std::vector<IRuntimeFilter*> _runtime_filters;
-    const bool _is_global = false;
+    const bool _need_local_merge = false;
     // prob_contition index -> [IRuntimeFilter]
     std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
 };
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 916ce62aa26..418068ef9d8 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -100,13 +100,8 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : p._runtime_filter_descs) {
         IRuntimeFilter* runtime_filter = nullptr;
-        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), p.node_id(), 
&runtime_filter, false));
-        } else {
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), p.node_id(), 
&runtime_filter, false));
-        }
+        RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, 
false, p.node_id(),
+                                                                
&runtime_filter));
         runtime_filter->init_profile(_runtime_profile.get());
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 19de8fb7bdc..9c6eff4cda8 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -99,9 +99,9 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _hash_table_init(state);
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                p._runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i],
-                _build_expr_ctxs.size() == 1, p._use_global_rf, 
p._child_x->parallel_tasks()));
+        RETURN_IF_ERROR(state->register_producer_runtime_filter(
+                p._runtime_filter_descs[i], p._need_local_merge, 
&_runtime_filters[i],
+                _build_expr_ctxs.size() == 1));
     }
 
     return Status::OK();
@@ -370,7 +370,7 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
 HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                        const TPlanNode& tnode,
                                                        const DescriptorTbl& 
descs,
-                                                       bool use_global_rf)
+                                                       bool need_local_merge)
         : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs),
           _join_distribution(tnode.hash_join_node.__isset.dist_type ? 
tnode.hash_join_node.dist_type
                                                                     : 
TJoinDistributionType::NONE),
@@ -379,7 +379,7 @@ 
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope
           _partition_exprs(tnode.__isset.distribute_expr_lists && 
!_is_broadcast_join
                                    ? tnode.distribute_expr_lists[1]
                                    : std::vector<TExpr> {}),
-          _use_global_rf(use_global_rf) {}
+          _need_local_merge(need_local_merge) {}
 
 Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
     if (_is_broadcast_join) {
@@ -475,10 +475,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                 local_state._build_side_mutable_block.to_block());
 
-        const bool use_global_rf =
-                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
+        const bool need_local_merge =
+                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
         RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
-                state, local_state._shared_state->build_block.get(), 
&local_state, use_global_rf));
+                state, local_state._shared_state->build_block.get(), 
&local_state,
+                need_local_merge));
         RETURN_IF_ERROR(
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
@@ -528,8 +529,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state._shared_state->build_block = 
_shared_hash_table_context->block;
         local_state._shared_state->build_indexes_null =
                 _shared_hash_table_context->build_indexes_null;
-        const bool use_global_rf =
-                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
+        const bool need_local_merge =
+                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
 
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
@@ -545,7 +546,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                 local_state._runtime_filter_slots =
                                         std::make_shared<VRuntimeFilterSlots>(
                                                 _build_expr_ctxs, 
local_state._runtime_filters,
-                                                use_global_rf);
+                                                need_local_merge);
 
                                 
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
                                         state, arg.hash_table->size()));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 56a651e4210..134aba69a48 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -190,7 +190,7 @@ private:
     vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
     const std::vector<TExpr> _partition_exprs;
 
-    const bool _use_global_rf;
+    const bool _need_local_merge;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index c30bb5ad67c..8485e2c0b24 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     }
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                p._runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i]));
+        
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
 false,
+                                                                
&_runtime_filters[i], false));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index e251b203794..358086e94eb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -230,8 +230,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state = RuntimeState::create_unique(
             local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
             request.query_options, _query_ctx->query_globals, _exec_env, 
_query_ctx.get());
-    if (local_params.__isset.runtime_filter_params) {
-        
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
+    if (idx == 0 && local_params.__isset.runtime_filter_params) {
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                local_params.runtime_filter_params);
     }
 
     _runtime_state->set_task_execution_context(shared_from_this());
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 9ffcb40038c..38db8cbe8ff 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -78,7 +78,7 @@ public:
     RuntimeState* get_runtime_state() { return _runtime_state.get(); }
 
     virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId 
/*fragment_instance_id*/) {
-        return _runtime_state->runtime_filter_mgr();
+        return _runtime_state->local_runtime_filter_mgr();
     }
 
     QueryContext* get_query_ctx() { return _query_ctx.get(); }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 9087f94db5e..693cb4d5c30 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -210,8 +210,9 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     _runtime_state->set_total_load_streams(request.total_load_streams);
     _runtime_state->set_num_local_sink(request.num_local_sink);
 
-    _use_global_rf = request.__isset.parallel_instances && 
(request.__isset.per_node_shared_scans &&
-                                                            
!request.per_node_shared_scans.empty());
+    _need_local_merge =
+            request.__isset.parallel_instances &&
+            (request.__isset.per_node_shared_scans && 
!request.per_node_shared_scans.empty());
     // 2. Build pipelines with operators in this fragment.
     auto root_pipeline = add_pipeline();
     RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
@@ -523,11 +524,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             filterparams->query_ctx = _query_ctx.get();
         }
 
-        // build runtime_filter_mgr for each instance
+        // build local_runtime_filter_mgr for each instance
         runtime_filter_mgr =
                 std::make_unique<RuntimeFilterMgr>(request.query_id, 
filterparams.get());
-        if (local_params.__isset.runtime_filter_params) {
-            
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);
+        if (i == 0 && local_params.__isset.runtime_filter_params) {
+            _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                    local_params.runtime_filter_params);
         }
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
@@ -986,7 +988,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
 
         DataSinkOperatorXPtr sink;
         sink.reset(new HashJoinBuildSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs,
-                                                  _use_global_rf));
+                                                  _need_local_merge));
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 439b0072d72..54714dd4665 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -177,7 +177,7 @@ private:
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
     std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
 
-    bool _use_global_rf = false;
+    bool _need_local_merge = false;
 
     // It is used to manage the lifecycle of RuntimeFilterMergeController
     std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> 
_merge_controller_handlers;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index af7370a4c58..64f5d6416a8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1304,7 +1304,7 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
         pip_context = iter->second;
 
         DCHECK(pip_context != nullptr);
-        runtime_filter_mgr = 
pip_context->get_runtime_filter_mgr(fragment_instance_id);
+        runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
     } else {
         std::unique_lock<std::mutex> lock(_lock);
         auto iter = _fragment_instance_map.find(tfragment_instance_id);
@@ -1315,7 +1315,8 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
         fragment_executor = iter->second;
 
         DCHECK(fragment_executor != nullptr);
-        runtime_filter_mgr = 
fragment_executor->runtime_state()->runtime_filter_mgr();
+        runtime_filter_mgr =
+                
fragment_executor->runtime_state()->get_query_ctx()->runtime_filter_mgr();
     }
 
     return runtime_filter_mgr->update_filter(request, attach_data);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index cf4321beab3..3db91ba2824 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -187,6 +187,9 @@ public:
         return _query_options.__isset.fe_process_uuid ? 
_query_options.fe_process_uuid : 0;
     }
 
+    // global runtime filter mgr, the runtime filter have remote target or
+    // need local merge should regist here. before publish() or 
push_to_remote()
+    // the runtime filter should do the local merge work
     RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
 
     TUniqueId query_id() const { return _query_id; }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 799348ef1d3..95f65c5fc32 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -74,7 +74,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int 
filter_id,
 Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& 
options, int node_id,
                                                   IRuntimeFilter** 
consumer_filter,
-                                                  bool build_bf_exactly, bool 
is_global) {
+                                                  bool build_bf_exactly, bool 
need_local_merge) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
     bool has_exist = false;
@@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
         }
     }
 
-    // TODO: union the remote opt and global two case as one case to one judge
-    bool remote_opt_or_global = (desc.__isset.opt_remote_rf && 
desc.opt_remote_rf) || is_global;
-
     if (!has_exist) {
         IRuntimeFilter* filter;
-        RETURN_IF_ERROR(IRuntimeFilter::create(
-                _state, remote_opt_or_global ? _state->obj_pool() : &_pool, 
&desc, &options,
-                RuntimeFilterRole::CONSUMER, node_id, &filter, 
build_bf_exactly, is_global));
+        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
+                                               build_bf_exactly, 
need_local_merge));
         _consumer_map[key].emplace_back(node_id, filter);
         *consumer_filter = filter;
-    } else if (!remote_opt_or_global) {
+    } else if (!need_local_merge) {
         return Status::InvalidArgument("filter has registered");
     }
 
     return Status::OK();
 }
 
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
+        const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& 
options,
+        doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) {
+    SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+    int32_t key = desc.filter_id;
+
+    decltype(_local_merge_producer_map.end()) iter;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        iter = _local_merge_producer_map.find(key);
+        if (iter == _local_merge_producer_map.end()) {
+            auto [new_iter, _] = _local_merge_producer_map.emplace(key, 
LocalMergeFilters {});
+            iter = new_iter;
+        }
+    }
+
+    DCHECK(_state != nullptr);
+    RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                           RuntimeFilterRole::PRODUCER, -1, 
producer_filter,
+                                           build_bf_exactly, true));
+    {
+        std::lock_guard<std::mutex> l(*iter->second.lock);
+        if (iter->second.filters.empty()) {
+            IRuntimeFilter* merge_filter = nullptr;
+            RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, 
&options,
+                                                   
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
+                                                   build_bf_exactly, true));
+            iter->second.filters.emplace_back(merge_filter);
+        }
+        iter->second.merge_time++;
+        iter->second.filters.emplace_back(*producer_filter);
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_local_merge_producer_filters(
+        int filter_id, doris::LocalMergeFilters** local_merge_filters) {
+    std::lock_guard<std::mutex> l(_lock);
+    auto iter = _local_merge_producer_map.find(filter_id);
+    if (iter == _local_merge_producer_map.end()) {
+        return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", 
filter_id);
+    }
+    *local_merge_filters = &iter->second;
+    DCHECK(!iter->second.filters.empty());
+    DCHECK_GT(iter->second.merge_time, 0);
+    return Status::OK();
+}
+
 Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& options,
                                                   IRuntimeFilter** 
producer_filter,
-                                                  bool build_bf_exactly, bool 
is_global,
-                                                  int parallel_tasks) {
+                                                  bool build_bf_exactly) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
     std::lock_guard<std::mutex> l(_lock);
@@ -122,7 +166,7 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
     }
     RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
                                            RuntimeFilterRole::PRODUCER, -1, 
producer_filter,
-                                           build_bf_exactly, is_global, 
parallel_tasks));
+                                           build_bf_exactly));
     _producer_map.emplace(key, *producer_filter);
     return Status::OK();
 }
@@ -133,7 +177,19 @@ Status RuntimeFilterMgr::update_filter(const 
PPublishFilterRequest* request,
     UpdateRuntimeFilterParams params(request, data, &_pool);
     int filter_id = request->filter_id();
     std::vector<IRuntimeFilter*> filters;
-    RETURN_IF_ERROR(get_consume_filters(filter_id, filters));
+    // The code is organized for upgrade compatibility to prevent infinite 
waiting
+    // old way update filter the code should be deleted after the upgrade is 
complete.
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto iter = _consumer_map.find(filter_id);
+        if (iter == _consumer_map.end()) {
+            return Status::InvalidArgument("unknown filter: {}, role: 
CONSUMER.", filter_id);
+        }
+        for (auto& holder : iter->second) {
+            filters.emplace_back(holder.filter);
+        }
+        iter->second.clear();
+    }
     for (auto filter : filters) {
         RETURN_IF_ERROR(filter->update_filter(&params));
     }
@@ -143,8 +199,11 @@ Status RuntimeFilterMgr::update_filter(const 
PPublishFilterRequest* request,
 
 void RuntimeFilterMgr::set_runtime_filter_params(
         const TRuntimeFilterParams& runtime_filter_params) {
-    this->_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
-    this->_has_merge_addr = true;
+    std::lock_guard l(_lock);
+    if (!_has_merge_addr) {
+        _merge_addr = runtime_filter_params.runtime_filter_merge_addr;
+        _has_merge_addr = true;
+    }
 }
 
 Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) {
@@ -454,7 +513,7 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(RuntimeState* sta
     params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
     params->enable_pipeline_exec = state->enable_pipeline_exec();
     params->execution_timeout = state->execution_timeout();
-    params->runtime_filter_mgr = state->runtime_filter_mgr();
+    params->runtime_filter_mgr = state->local_runtime_filter_mgr();
     params->exec_env = state->exec_env();
     params->query_id.set_hi(state->query_id().hi);
     params->query_id.set_lo(state->query_id().lo);
@@ -479,8 +538,6 @@ RuntimeFilterParamsContext* 
RuntimeFilterParamsContext::create(QueryContext* que
 
     params->be_exec_version = query_ctx->be_exec_version();
     params->query_ctx = query_ctx;
-    params->_obj_pool = &query_ctx->obj_pool;
-    params->_is_global = true;
     return params;
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 7caea8011d2..c9b455bc107 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -53,6 +53,12 @@ class QueryContext;
 struct RuntimeFilterParamsContext;
 class ExecEnv;
 
+struct LocalMergeFilters {
+    std::unique_ptr<std::mutex> lock = std::make_unique<std::mutex>();
+    int merge_time = 0;
+    std::vector<IRuntimeFilter*> filters;
+};
+
 /// producer:
 /// Filter filter;
 /// get_filter(filter_id, &filter);
@@ -76,10 +82,18 @@ public:
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     int node_id, IRuntimeFilter** 
consumer_filter,
-                                    bool build_bf_exactly = false, bool 
is_global = false);
+                                    bool build_bf_exactly = false, bool 
need_local_merge = false);
+
+    Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
+                                                const TQueryOptions& options,
+                                                IRuntimeFilter** 
producer_filter,
+                                                bool build_bf_exactly = false);
+
+    Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** 
local_merge_filters);
+
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    IRuntimeFilter** producer_filter, bool 
build_bf_exactly = false,
-                                    bool is_global = false, int parallel_tasks 
= 0);
+                                    IRuntimeFilter** producer_filter,
+                                    bool build_bf_exactly = false);
 
     // update filter by remote
     Status update_filter(const PPublishFilterRequest* request,
@@ -100,6 +114,7 @@ private:
     /// TODO: should it need protected by a mutex?
     std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
     std::map<int32_t, IRuntimeFilter*> _producer_map;
+    std::map<int32_t, LocalMergeFilters> _local_merge_producer_map;
 
     RuntimeFilterParamsContext* _state = nullptr;
     std::unique_ptr<MemTracker> _tracker;
@@ -257,15 +272,6 @@ struct RuntimeFilterParamsContext {
     int be_exec_version;
     QueryContext* query_ctx;
     QueryContext* get_query_ctx() const { return query_ctx; }
-    ObjectPool* _obj_pool;
-    bool _is_global = false;
-    PUniqueId fragment_instance_id() const {
-        DCHECK(!_is_global);
-        return _fragment_instance_id;
-    }
-    ObjectPool* obj_pool() const {
-        DCHECK(_is_global);
-        return _obj_pool;
-    }
+    PUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 };
 } // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d25d914147b..8762366fe04 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -101,7 +101,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
     _runtime_filter_mgr.reset(new 
RuntimeFilterMgr(fragment_exec_params.query_id,
                                                    
RuntimeFilterParamsContext::create(this)));
     if (fragment_exec_params.__isset.runtime_filter_params) {
-        
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                fragment_exec_params.runtime_filter_params);
     }
 }
 
@@ -305,11 +306,6 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
     return Status::OK();
 }
 
-void RuntimeState::set_runtime_filter_params(
-        const TRuntimeFilterParams& runtime_filter_params) const {
-    _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params);
-}
-
 void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string& 
name) {
     _query_mem_tracker = std::make_shared<MemTrackerLimiter>(
             MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}", 
name, print_id(id)));
@@ -506,4 +502,32 @@ bool RuntimeState::enable_page_cache() const {
            (_query_options.__isset.enable_page_cache && 
_query_options.enable_page_cache);
 }
 
+RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() {
+    return _query_ctx->runtime_filter_mgr();
+}
+
+Status RuntimeState::register_producer_runtime_filter(const 
doris::TRuntimeFilterDesc& desc,
+                                                      bool need_local_merge,
+                                                      doris::IRuntimeFilter** 
producer_filter,
+                                                      bool build_bf_exactly) {
+    if (desc.has_remote_targets || need_local_merge) {
+        return 
global_runtime_filter_mgr()->register_local_merge_producer_filter(
+                desc, query_options(), producer_filter, build_bf_exactly);
+    } else {
+        return local_runtime_filter_mgr()->register_producer_filter(
+                desc, query_options(), producer_filter, build_bf_exactly);
+    }
+}
+
+Status RuntimeState::register_consumer_runtime_filter(const 
doris::TRuntimeFilterDesc& desc,
+                                                      bool need_local_merge, 
int node_id,
+                                                      doris::IRuntimeFilter** 
consumer_filter) {
+    if (desc.has_remote_targets || need_local_merge) {
+        return global_runtime_filter_mgr()->register_consumer_filter(desc, 
query_options(), node_id,
+                                                                     
consumer_filter, false, true);
+    } else {
+        return local_runtime_filter_mgr()->register_consumer_filter(desc, 
query_options(), node_id,
+                                                                    
consumer_filter, false, false);
+    }
+}
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cdc7e83f042..03b518e5c34 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -44,6 +44,7 @@
 #include "util/runtime_profile.h"
 
 namespace doris {
+class IRuntimeFilter;
 
 namespace pipeline {
 class PipelineXLocalStateBase;
@@ -98,8 +99,6 @@ public:
     Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& 
query_options,
                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
-    void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params) const;
-
     // for ut and non-query.
     void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; }
     void init_mem_trackers(const TUniqueId& id = TUniqueId(), const 
std::string& name = "unknown");
@@ -454,7 +453,10 @@ public:
     // if load mem limit is not set, or is zero, using query mem limit instead.
     int64_t get_load_mem_limit();
 
-    RuntimeFilterMgr* runtime_filter_mgr() {
+    // local runtime filter mgr, the runtime filter do not have remote target 
or
+    // not need local merge should regist here. the instance exec finish, the 
local
+    // runtime filter mgr can release the memory of local runtime filter
+    RuntimeFilterMgr* local_runtime_filter_mgr() {
         if (_pipeline_x_runtime_filter_mgr) {
             return _pipeline_x_runtime_filter_mgr;
         } else {
@@ -462,6 +464,8 @@ public:
         }
     }
 
+    RuntimeFilterMgr* global_runtime_filter_mgr();
+
     void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr* 
pipeline_x_runtime_filter_mgr) {
         _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
     }
@@ -567,6 +571,15 @@ public:
         return _task_execution_context;
     }
 
+    Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
+                                            bool need_local_merge,
+                                            doris::IRuntimeFilter** 
producer_filter,
+                                            bool build_bf_exactly);
+
+    Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& 
desc,
+                                            bool need_local_merge, int node_id,
+                                            doris::IRuntimeFilter** 
producer_filter);
+
 private:
     Status create_error_log_file();
 
@@ -595,9 +608,6 @@ private:
     // owned by PipelineXFragmentContext
     RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr;
 
-    // Protects _data_stream_recvrs_pool
-    std::mutex _data_stream_recvrs_lock;
-
     // Data stream receivers created by a plan fragment are gathered here to 
make sure
     // they are destroyed before _obj_pool (class members are destroyed in 
reverse order).
     // Receivers depend on the descriptor table and we need to guarantee that 
their control
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 67dffa4b203..de3b63371ee 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -180,9 +180,9 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 #endif
 
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                _runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i],
-                _probe_expr_ctxs.size() == 1));
+        
RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i],
 false,
+                                                                
&_runtime_filters[i],
+                                                                
_probe_expr_ctxs.size() == 1));
     }
 
     // init left/right output slots flags, only column of slot_id in 
_hash_output_slot_ids need
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 150068096ba..ad168ba9c86 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -118,8 +118,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     std::vector<TExpr> filter_src_exprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
-                _runtime_filter_descs[i], state->query_options(), 
&_runtime_filters[i]));
+        
RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i],
 false,
+                                                                
&_runtime_filters[i], false));
     }
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, 
_filter_src_expr_ctxs));
     return Status::OK();
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index eba11dac45d..e683c4f2be0 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t 
filter_id,
     _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
 }
 
-Status RuntimeFilterConsumer::init(RuntimeState* state, bool is_global) {
+Status RuntimeFilterConsumer::init(RuntimeState* state, bool need_local_merge) 
{
     _state = state;
-    RETURN_IF_ERROR(_register_runtime_filter(is_global));
+    RETURN_IF_ERROR(_register_runtime_filter(need_local_merge));
     return Status::OK();
 }
 
@@ -45,28 +45,15 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* 
profile) {
     profile->add_info_string("RuntimeFilters: ", ss.str());
 }
 
-Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) {
+Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) {
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.reserve(filter_size);
     _runtime_filter_ready_flag.reserve(filter_size);
     for (int i = 0; i < filter_size; ++i) {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
-        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            DCHECK(filter_desc.has_remote_targets);
-            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
-                    is_global));
-        } else if (is_global) {
-            // For pipelineX engine, runtime filter is global iff data 
distribution is ignored.
-            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
-                    is_global));
-        } else {
-            
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
-                    is_global));
-        }
+        RETURN_IF_ERROR(_state->register_consumer_runtime_filter(filter_desc, 
need_local_merge,
+                                                                 _filter_id, 
&runtime_filter));
         _runtime_filter_ctxs.emplace_back(runtime_filter);
         _runtime_filter_ready_flag.emplace_back(false);
     }
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index 15b9455ac56..b8513e666bc 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -30,7 +30,7 @@ public:
                           const RowDescriptor& row_descriptor, 
VExprContextSPtrs& conjuncts);
     ~RuntimeFilterConsumer() = default;
 
-    Status init(RuntimeState* state, bool is_global = false);
+    Status init(RuntimeState* state, bool need_local_merge = false);
 
     // Try to append late arrived runtime filters.
     // Return num of filters which are applied already.
@@ -42,7 +42,7 @@ public:
 
 protected:
     // Register and get all runtime filters at Init phase.
-    Status _register_runtime_filter(bool is_global);
+    Status _register_runtime_filter(bool need_local_merge);
     // Get all arrived runtime filters at Open phase.
     Status _acquire_runtime_filter();
     // Append late-arrival runtime filters to the vconjunct_ctx.
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp 
b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 42f6250a030..13d19921b03 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -81,13 +81,8 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* 
state) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : _runtime_filter_descs) {
         IRuntimeFilter* runtime_filter = nullptr;
-        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), id(), 
&runtime_filter, false));
-        } else {
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), id(), 
&runtime_filter, false));
-        }
+        RETURN_IF_ERROR(
+                state->register_consumer_runtime_filter(filter_desc, false, 
id(), &runtime_filter));
         runtime_filter->init_profile(_runtime_profile.get());
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ebd6b240028..1093600a485 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2153,7 +2153,8 @@ public class Coordinator implements CoordInterface {
             }
 
             for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) {
-                ridToBuilderNum.merge(rid, params.instanceExecParams.size(), 
Integer::sum);
+                ridToBuilderNum.merge(rid,
+                        (int) params.instanceExecParams.stream().map(ins -> 
ins.host).distinct().count(), Integer::sum);
             }
         }
         // Use the uppermost fragment as a merged node, the uppermost fragment 
has one and only one instance


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to