This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refactor_rf by this push:
     new 9fb4944c72e [refactor](runtime filter) Refine consumer (#48198)
9fb4944c72e is described below

commit 9fb4944c72e6664ee7441497f5ea10682e5aa293
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Fri Feb 21 23:39:13 2025 +0800

    [refactor](runtime filter) Refine consumer (#48198)
---
 .../exec/multi_cast_data_stream_source.cpp         |   4 +-
 be/src/pipeline/exec/scan_operator.cpp             |   2 +-
 be/src/pipeline/exec/scan_operator.h               |   4 +-
 be/src/runtime/fragment_mgr.cpp                    | 141 ++++-------
 be/src/runtime/fragment_mgr.h                      |   2 -
 be/src/runtime/query_context.h                     |   3 +
 be/src/runtime/runtime_state.cpp                   |   4 +-
 be/src/runtime_filter/role/consumer.cpp            | 152 ++++++++++-
 be/src/runtime_filter/role/consumer.h              |  41 ++-
 be/src/runtime_filter/role/merger.h                |   7 +-
 be/src/runtime_filter/role/producer.cpp            |  23 +-
 be/src/runtime_filter/role/producer.h              |   5 +-
 be/src/runtime_filter/role/runtime_filter.cpp      |   2 +-
 be/src/runtime_filter/role/runtime_filter.h        |  16 +-
 be/src/runtime_filter/runtime_filter_helper.cpp    |  26 +-
 be/src/runtime_filter/runtime_filter_helper.h      |  28 +--
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  37 +--
 be/src/runtime_filter/runtime_filter_mgr.h         |  89 +------
 be/src/runtime_filter/runtime_filter_slots.cpp     |  18 --
 be/src/runtime_filter/runtime_filter_wrapper.cpp   | 280 ++++++---------------
 be/src/runtime_filter/runtime_filter_wrapper.h     | 132 ++++------
 be/src/vec/exec/scan/vscanner.cpp                  |   3 +-
 be/src/vec/exprs/vbitmap_predicate.cpp             |   2 +-
 be/src/vec/exprs/vbitmap_predicate.h               |   2 +-
 be/src/vec/exprs/vbloom_predicate.cpp              |   3 +-
 be/src/vec/exprs/vbloom_predicate.h                |   2 +-
 gensrc/proto/internal_service.proto                |   2 +-
 gensrc/thrift/PlanNodes.thrift                     |   2 +-
 28 files changed, 409 insertions(+), 623 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 796eb6345e1..ae35a1aebf9 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -29,7 +29,7 @@ 
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime
                                                                          
OperatorXBase* parent)
         : Base(state, parent),
           _helper(static_cast<Parent*>(parent)->dest_id_from_sink(), 
parent->runtime_filter_descs(),
-                  
static_cast<Parent*>(parent)->_multi_cast_output_row_descriptor, _conjuncts) {}
+                  
static_cast<Parent*>(parent)->_multi_cast_output_row_descriptor) {}
 
 Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -52,7 +52,7 @@ Status 
MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
-    RETURN_IF_ERROR(_helper.acquire_runtime_filter());
+    RETURN_IF_ERROR(_helper.acquire_runtime_filter(_conjuncts));
     auto& p = _parent->cast<Parent>();
     _output_expr_contexts.resize(p._output_expr_contexts.size());
     for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 3b0a276fcfe..535db749d9d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -97,7 +97,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
         RETURN_IF_ERROR(
                 p._common_expr_ctxs_push_down[i]->clone(state, 
_common_expr_ctxs_push_down[i]));
     }
-    RETURN_IF_ERROR(_helper.acquire_runtime_filter());
+    RETURN_IF_ERROR(_helper.acquire_runtime_filter(_conjuncts));
     _stale_expr_ctxs.resize(p._stale_expr_ctxs.size());
     for (size_t i = 0; i < _stale_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._stale_expr_ctxs[i]->clone(state, 
_stale_expr_ctxs[i]));
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 0fc866679c0..6cd3da930de 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -66,8 +66,8 @@ class ScanLocalStateBase : public PipelineXLocalState<> {
 public:
     ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<>(state, parent),
-              _helper(parent->node_id(), parent->runtime_filter_descs(), 
parent->row_descriptor(),
-                      _conjuncts) {}
+              _helper(parent->node_id(), parent->runtime_filter_descs(), 
parent->row_descriptor()) {
+    }
     ~ScanLocalStateBase() override = default;
 
     [[nodiscard]] virtual bool should_run_serial() const = 0;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9dc0febc892..b2780938d38 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -860,11 +860,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
                     { return 
Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
 
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-    RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
-            params.local_params[0], params.query_id, params.query_options, 
&handler,
-            RuntimeFilterParamsContext::create(context->get_runtime_state())));
-    if (handler) {
+    if (params.local_params[0].__isset.runtime_filter_params &&
+        
params.local_params[0].runtime_filter_params.rid_to_runtime_filter.size() > 0) {
+        auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>(
+                
RuntimeFilterParamsContext::create(context->get_runtime_state()));
+        RETURN_IF_ERROR(handler->init(params.query_id, 
params.local_params[0].runtime_filter_params,
+                                      params.query_options));
         query_ctx->set_merge_controller_handler(handler);
     }
 
@@ -1285,109 +1286,75 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
 
 Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
                                    butil::IOBufAsZeroCopyInputStream* 
attach_data) {
-    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
-    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-
-    const auto& fragment_ids = request->fragment_ids();
-    {
-        for (auto fragment_id : fragment_ids) {
-            pip_context =
-                    
_pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
-            if (pip_context == nullptr) {
-                continue;
-            }
-
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            break;
+    UniqueId queryid = request->query_id();
+    TUniqueId query_id;
+    query_id.__set_hi(queryid.hi);
+    query_id.__set_lo(queryid.lo);
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr();
+        DCHECK(runtime_filter_mgr != nullptr);
+
+        // 1. get the target filters
+        std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
+                runtime_filter_mgr->get_consume_filters(request->filter_id());
+
+        // 2. create the filter wrapper to replace or ignore the target filters
+        if (!filters.empty()) {
+            RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
+            std::ranges::for_each(filters, [&](auto& filter) { 
filter->signal(filters[0].get()); });
         }
-    }
-
-    if (runtime_filter_mgr == nullptr) {
+    } else {
         // all instance finished
-        return Status::OK();
-    }
-
-    SCOPED_ATTACH_TASK(pip_context->get_query_ctx());
-    // 1. get the target filters
-    std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
-            runtime_filter_mgr->get_consume_filters(request->filter_id());
-
-    // 2. create the filter wrapper to replace or ignore the target filters
-    if (!filters.empty()) {
-        RETURN_IF_ERROR(filters[0]->assign_data_into_wrapper(*request, 
attach_data));
-
-        std::ranges::for_each(filters, [&](auto& filter) { 
filter->signal(filters[0].get()); });
     }
-
     return Status::OK();
 }
 
 Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
     UniqueId queryid = request->query_id();
-
-    std::shared_ptr<QueryContext> query_ctx;
-    {
-        TUniqueId query_id;
-        query_id.__set_hi(queryid.hi);
-        query_id.__set_lo(queryid.lo);
-        if (auto q_ctx = get_query_ctx(query_id)) {
-            query_ctx = q_ctx;
-        } else {
-            return Status::EndOfFile(
-                    "Send filter size failed: Query context (query-id: {}) not 
found, maybe "
-                    "finished",
-                    queryid.to_string());
-        }
+    TUniqueId query_id;
+    query_id.__set_hi(queryid.hi);
+    query_id.__set_lo(queryid.lo);
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, 
request);
+    } else {
+        return Status::EndOfFile(
+                "Send filter size failed: Query context (query-id: {}) not 
found, maybe "
+                "finished",
+                queryid.to_string());
     }
-
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
-    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
-    auto merge_status = filter_controller->send_filter_size(query_ctx, 
request);
-    return merge_status;
 }
 
 Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
     UniqueId queryid = request->query_id();
-    std::shared_ptr<QueryContext> query_ctx;
-    {
-        TUniqueId query_id;
-        query_id.__set_hi(queryid.hi);
-        query_id.__set_lo(queryid.lo);
-        if (auto q_ctx = get_query_ctx(query_id)) {
-            query_ctx = q_ctx;
-        } else {
-            return Status::EndOfFile(
-                    "Sync filter size failed: Query context (query-id: {}) 
already finished",
-                    queryid.to_string());
-        }
+    TUniqueId query_id;
+    query_id.__set_hi(queryid.hi);
+    query_id.__set_lo(queryid.lo);
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        return q_ctx->runtime_filter_mgr()->sync_filter_size(request);
+    } else {
+        return Status::EndOfFile(
+                "Sync filter size failed: Query context (query-id: {}) already 
finished",
+                queryid.to_string());
     }
-    return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
 }
 
 Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
                                  butil::IOBufAsZeroCopyInputStream* 
attach_data) {
     UniqueId queryid = request->query_id();
 
-    std::shared_ptr<QueryContext> query_ctx;
-    {
-        TUniqueId query_id;
-        query_id.__set_hi(queryid.hi);
-        query_id.__set_lo(queryid.lo);
-        if (auto q_ctx = get_query_ctx(query_id)) {
-            query_ctx = q_ctx;
-        } else {
-            return Status::EndOfFile(
-                    "Merge filter size failed: Query context (query-id: {}) 
already finished",
-                    queryid.to_string());
-        }
+    TUniqueId query_id;
+    query_id.__set_hi(queryid.hi);
+    query_id.__set_lo(queryid.lo);
+    if (auto q_ctx = get_query_ctx(query_id)) {
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+        return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, 
attach_data);
+    } else {
+        return Status::EndOfFile(
+                "Merge filter size failed: Query context (query-id: {}) 
already finished",
+                queryid.to_string());
     }
-    SCOPED_ATTACH_TASK(query_ctx.get());
-    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
-    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
-    auto merge_status = filter_controller->merge(query_ctx, request, 
attach_data);
-    return merge_status;
 }
 
 void FragmentMgr::get_runtime_query_info(
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 8aaa572c041..60b13348ad4 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -218,8 +218,6 @@ private:
 
     std::shared_ptr<MetricEntity> _entity;
     UIntGauge* timeout_canceled_fragment_count = nullptr;
-
-    RuntimeFilterMergeController _runtimefilter_controller;
 };
 
 uint64_t get_fragment_executing_count();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7056c8bd7c5..abe6b0208af 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -259,6 +259,9 @@ public:
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
         _merge_controller_handler = handler;
     }
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> 
get_merge_controller_handler() const {
+        return _merge_controller_handler;
+    }
 
     bool is_nereids() const { return _is_nereids; }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 012113114f1..7d7d1ee2b3b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -496,8 +496,8 @@ Status RuntimeState::register_producer_runtime_filter(
     // When RF is published, consumers in both global and local RF mgr will be 
found.
     RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
             desc, query_options(), producer_filter, parent_profile));
-    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_filter(desc, 
query_options(),
-                                                                              
*producer_filter));
+    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
+            desc, query_options(), *producer_filter));
     return Status::OK();
 }
 
diff --git a/be/src/runtime_filter/role/consumer.cpp 
b/be/src/runtime_filter/role/consumer.cpp
index 97f07702b79..d150dd41d1c 100644
--- a/be/src/runtime_filter/role/consumer.cpp
+++ b/be/src/runtime_filter/role/consumer.cpp
@@ -17,10 +17,15 @@
 
 #include "runtime_filter/role/consumer.h"
 
+#include "exprs/create_predicate_function.h"
+#include "vec/exprs/vbitmap_predicate.h"
+#include "vec/exprs/vbloom_predicate.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vexpr_context.h"
+
 namespace doris {
 
 Status RuntimeFilterConsumer::_apply_ready_expr(
-        std::list<vectorized::VExprContextSPtr>& probe_ctxs,
         std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) {
     _check_state({State::READY});
     _set_state(State::APPLIED);
@@ -32,12 +37,12 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
     }
 
     auto origin_size = push_exprs.size();
-    RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, 
_probe_expr));
+    RETURN_IF_ERROR(_get_push_exprs(push_exprs, _probe_expr));
     // The runtime filter is pushed down, adding filtering information.
     auto* expr_filtered_rows_counter =
-            ADD_COUNTER(_excution_profile, "ExprFilteredRows", TUnit::UNIT);
-    auto* expr_input_rows_counter = ADD_COUNTER(_excution_profile, 
"ExprInputRows", TUnit::UNIT);
-    auto* always_true_counter = ADD_COUNTER(_excution_profile, 
"AlwaysTruePassRows", TUnit::UNIT);
+            ADD_COUNTER(_execution_profile, "ExprFilteredRows", TUnit::UNIT);
+    auto* expr_input_rows_counter = ADD_COUNTER(_execution_profile, 
"ExprInputRows", TUnit::UNIT);
+    auto* always_true_counter = ADD_COUNTER(_execution_profile, 
"AlwaysTruePassRows", TUnit::UNIT);
     for (auto i = origin_size; i < push_exprs.size(); i++) {
         push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, 
expr_input_rows_counter,
                                               always_true_counter);
@@ -45,10 +50,9 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
     return Status::OK();
 }
 
-Status 
RuntimeFilterConsumer::acquire_expr(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
-                                           
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) {
+Status 
RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs) {
     if (_rf_state == State::READY) {
-        RETURN_IF_ERROR(_apply_ready_expr(probe_ctxs, push_exprs));
+        RETURN_IF_ERROR(_apply_ready_expr(push_exprs));
     }
     if (_rf_state != State::APPLIED && _rf_state != State::TIMEOUT) {
         _check_state({State::NOT_READY});
@@ -73,11 +77,139 @@ void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
 }
 
 std::shared_ptr<pipeline::RuntimeFilterTimer> 
RuntimeFilterConsumer::create_filter_timer(
-        std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie) {
+        std::shared_ptr<pipeline::RuntimeFilterDependency> dependencies) {
     auto timer = 
std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time,
-                                                                
_rf_wait_time_ms, dependencie);
+                                                                
_rf_wait_time_ms, dependencies);
     _filter_timer.push_back(timer);
     return timer;
 }
 
+Status 
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFilterPtr>&
 container,
+                                              const TExpr& probe_expr) {
+    // TODO: `VExprContextSPtr` is not need, we should just create an expr.
+    vectorized::VExprContextSPtr probe_ctx;
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
probe_ctx));
+
+    auto real_filter_type = _wrapper->get_real_type();
+    bool null_aware = _wrapper->contain_null();
+    switch (real_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(false);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED
+                                        : TExprNodeType::IN_PRED);
+        node.in_predicate.__set_is_not_in(false);
+        node.__set_opcode(TExprOpcode::FILTER_IN);
+        node.__set_is_nullable(false);
+        auto in_pred = vectorized::VDirectInPredicate::create_shared(node, 
_wrapper->hybrid_set());
+        in_pred->add_child(probe_ctx->root());
+        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
+                node, in_pred, 
get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
+                null_aware);
+        container.push_back(wrapper);
+        break;
+    }
+    case RuntimeFilterType::MIN_FILTER: {
+        // create min filter
+        vectorized::VExprSPtr min_pred;
+        TExprNode min_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::GE, min_pred,
+                                              &min_pred_node, null_aware));
+        vectorized::VExprSPtr min_literal;
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(),
+                                       _wrapper->minmax_func()->get_min(), 
min_literal));
+        min_pred->add_child(probe_ctx->root());
+        min_pred->add_child(min_literal);
+        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
+                min_pred_node, min_pred, get_comparison_ignore_thredhold()));
+        break;
+    }
+    case RuntimeFilterType::MAX_FILTER: {
+        vectorized::VExprSPtr max_pred;
+        // create max filter
+        TExprNode max_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
+                                              &max_pred_node, null_aware));
+        vectorized::VExprSPtr max_literal;
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(),
+                                       _wrapper->minmax_func()->get_max(), 
max_literal));
+        max_pred->add_child(probe_ctx->root());
+        max_pred->add_child(max_literal);
+        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
+                max_pred_node, max_pred, get_comparison_ignore_thredhold()));
+        break;
+    }
+    case RuntimeFilterType::MINMAX_FILTER: {
+        vectorized::VExprSPtr max_pred;
+        // create max filter
+        TExprNode max_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
+                                              &max_pred_node, null_aware));
+        vectorized::VExprSPtr max_literal;
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(),
+                                       _wrapper->minmax_func()->get_max(), 
max_literal));
+        max_pred->add_child(probe_ctx->root());
+        max_pred->add_child(max_literal);
+        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
+                max_pred_node, max_pred, get_comparison_ignore_thredhold(), 
null_aware));
+
+        vectorized::VExprContextSPtr new_probe_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
+
+        // create min filter
+        vectorized::VExprSPtr min_pred;
+        TExprNode min_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), 
TExprOpcode::GE,
+                                              min_pred, &min_pred_node, 
null_aware));
+        vectorized::VExprSPtr min_literal;
+        RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(),
+                                       _wrapper->minmax_func()->get_min(), 
min_literal));
+        min_pred->add_child(new_probe_ctx->root());
+        min_pred->add_child(min_literal);
+        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
+                min_pred_node, min_pred, get_comparison_ignore_thredhold(), 
null_aware));
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        // create a bloom filter
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(false);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::BLOOM_PRED);
+        node.__set_opcode(TExprOpcode::RT_FILTER);
+        node.__set_is_nullable(false);
+        auto bloom_pred = vectorized::VBloomPredicate::create_shared(node);
+        bloom_pred->set_filter(_wrapper->bloom_filter_func());
+        bloom_pred->add_child(probe_ctx->root());
+        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
+                node, bloom_pred, get_bloom_filter_ignore_thredhold());
+        container.push_back(wrapper);
+        break;
+    }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        // create a bitmap filter
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(false);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::BITMAP_PRED);
+        node.__set_opcode(TExprOpcode::RT_FILTER);
+        node.__set_is_nullable(false);
+        auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
+        bitmap_pred->set_filter(_wrapper->bitmap_filter_func());
+        bitmap_pred->add_child(probe_ctx->root());
+        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bitmap_pred, 0);
+        container.push_back(wrapper);
+        break;
+    }
+    default:
+        DCHECK(false);
+        break;
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime_filter/role/consumer.h 
b/be/src/runtime_filter/role/consumer.h
index 8b5a490baf1..c2926864c39 100644
--- a/be/src/runtime_filter/role/consumer.h
+++ b/be/src/runtime_filter/role/consumer.h
@@ -26,6 +26,12 @@ namespace doris {
 
 class RuntimeFilterConsumer : public RuntimeFilter {
 public:
+    enum class State {
+        NOT_READY,
+        READY,
+        TIMEOUT,
+        APPLIED,
+    };
     static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                          int node_id, std::shared_ptr<RuntimeFilterConsumer>* 
res,
                          RuntimeProfile* parent_profile) {
@@ -36,28 +42,20 @@ public:
         return Status::OK();
     }
 
-    int node_id() const { return _node_id; }
-
+    // Published by producer.
     void signal(RuntimeFilter* other);
 
     std::shared_ptr<pipeline::RuntimeFilterTimer> create_filter_timer(
-            std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie);
+            std::shared_ptr<pipeline::RuntimeFilterDependency> dependencies);
 
-    Status acquire_expr(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
-                        std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs);
+    // Called after `State` is ready (e.g. signaled)
+    Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs);
 
     std::string debug_string() const override {
         return fmt::format("Consumer: ({}, state: {})", _debug_string(), 
to_string(_rf_state));
     }
 
-    bool applied() { return _rf_state == State::APPLIED; }
-
-    enum class State {
-        NOT_READY,
-        READY,
-        TIMEOUT,
-        APPLIED,
-    };
+    bool is_applied() { return _rf_state == State::APPLIED; }
 
     static std::string to_string(const State& state) {
         switch (state) {
@@ -78,11 +76,10 @@ private:
     RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                           int node_id, RuntimeProfile* parent_profile)
             : RuntimeFilter(state, desc),
-              _node_id(node_id),
               _probe_expr(desc->planId_to_target_expr.find(node_id)->second),
               _profile(new RuntimeProfile(fmt::format("RF{}", 
desc->filter_id))),
               _storage_profile(new RuntimeProfile(fmt::format("Storage", 
desc->filter_id))),
-              _excution_profile(new RuntimeProfile(fmt::format("Execution", 
desc->filter_id))),
+              _execution_profile(new RuntimeProfile(fmt::format("Execution", 
desc->filter_id))),
               _registration_time(MonotonicMillis()),
               _rf_state(State::NOT_READY) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
@@ -93,13 +90,13 @@ private:
 
         parent_profile->add_child(_profile.get(), true, nullptr);
         _profile->add_child(_storage_profile.get(), true, nullptr);
-        _profile->add_child(_excution_profile.get(), true, nullptr);
+        _profile->add_child(_execution_profile.get(), true, nullptr);
         _wait_timer = ADD_TIMER(_profile, "WaitTime");
     }
 
-    Status _apply_ready_expr(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
-                             std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs);
-
+    Status _apply_ready_expr(std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs);
+    Status _get_push_exprs(std::vector<vectorized::VRuntimeFilterPtr>& 
container,
+                           const TExpr& probe_expr);
     void _check_state(std::vector<State> assumed_states) {
         if (!check_state_impl<RuntimeFilterConsumer>(_rf_state, 
assumed_states)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -113,15 +110,13 @@ private:
         _profile->add_info_string("Info", debug_string());
     }
 
-    int _node_id;
-
     TExpr _probe_expr;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 
     std::unique_ptr<RuntimeProfile> _profile;
-    std::unique_ptr<RuntimeProfile> _storage_profile;  // for storage layer 
stats
-    std::unique_ptr<RuntimeProfile> _excution_profile; // for execution layer 
stats
+    std::unique_ptr<RuntimeProfile> _storage_profile;   // for storage layer 
stats
+    std::unique_ptr<RuntimeProfile> _execution_profile; // for execution layer 
stats
     RuntimeProfile::Counter* _wait_timer = nullptr;
 
     int32_t _rf_wait_time_ms;
diff --git a/be/src/runtime_filter/role/merger.h 
b/be/src/runtime_filter/role/merger.h
index 627f6760afc..32308b5feda 100644
--- a/be/src/runtime_filter/role/merger.h
+++ b/be/src/runtime_filter/role/merger.h
@@ -60,8 +60,9 @@ public:
 
     bool add_rf_size(uint64_t size) {
         _received_rf_size_num++;
+        _received_sum_size += size;
         DCHECK_GE(_expected_producer_num, _received_rf_size_num) << 
debug_string();
-        return (_received_rf_size_num == _expected_producer_num);
+        return _received_rf_size_num == _expected_producer_num;
     }
 
     uint64_t get_received_sum_size() const { return _received_sum_size; }
@@ -90,10 +91,10 @@ private:
 
     std::atomic<State> _rf_state;
     int _expected_producer_num = 0;
-    std::atomic_int _received_producer_num = 0;
+    int _received_producer_num = 0;
 
     uint64_t _received_sum_size = 0;
-    std::atomic_int _received_rf_size_num = 0;
+    int _received_rf_size_num = 0;
 
     friend class RuntimeFilterProducer;
 };
diff --git a/be/src/runtime_filter/role/producer.cpp 
b/be/src/runtime_filter/role/producer.cpp
index d1c2286b41c..1cb44cb0501 100644
--- a/be/src/runtime_filter/role/producer.cpp
+++ b/be/src/runtime_filter/role/producer.cpp
@@ -147,7 +147,7 @@ Status RuntimeFilterProducer::send_size(
     // two case we need do local merge:
     // 1. has remote target
     // 2. has local target and has global consumer (means target scan has 
local shuffle)
-    if (_has_remote_target ||
+    if (!_has_local_target ||
         
!_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id()).empty())
 {
         LocalMergeContext* local_merge_filters = nullptr;
         
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
@@ -224,26 +224,7 @@ void RuntimeFilterProducer::set_synced_size(uint64_t 
global_size) {
 }
 
 Status RuntimeFilterProducer::init(size_t local_size) {
-    size_t real_size = _synced_size != -1 ? _synced_size : local_size;
-    if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-        real_size > _wrapper->max_in_num()) {
-        RETURN_IF_ERROR(_wrapper->change_to_bloom_filter());
-    }
-
-    if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
-        RETURN_IF_ERROR(_wrapper->init_bloom_filter(real_size));
-    }
-    if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER &&
-        real_size > _wrapper->max_in_num()) {
-        
set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED,
-                                               "reach max in num");
-    }
-    if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER && 
!_callback.empty()) {
-        for (auto& call : _callback) {
-            call();
-        }
-    }
-    return Status::OK();
+    return _wrapper->init(_synced_size != -1 ? _synced_size : local_size);
 }
 
 } // namespace doris
diff --git a/be/src/runtime_filter/role/producer.h 
b/be/src/runtime_filter/role/producer.h
index ae1b3708457..b71ac1e86dd 100644
--- a/be/src/runtime_filter/role/producer.h
+++ b/be/src/runtime_filter/role/producer.h
@@ -29,7 +29,6 @@ namespace doris {
  */
 class RuntimeFilterProducer : public RuntimeFilter {
 public:
-    using Callback = std::function<void()>;
     enum class State {
         WAITING_FOR_SEND_SIZE = 0,
         WAITING_FOR_SYNCED_SIZE = 1,
@@ -61,7 +60,7 @@ public:
             return;
         }
         _check_state({State::WAITING_FOR_DATA});
-        _wrapper->insert_batch(column, start);
+        _wrapper->insert(column, start);
     }
     Status publish(RuntimeState* state, bool build_hash_table);
     std::string debug_string() const override {
@@ -70,7 +69,6 @@ public:
                            _dependency ? _dependency->debug_string() : "none", 
_synced_size);
     }
 
-    void with_callback(Callback& callback) { _callback.emplace_back(callback); 
}
     int expr_order() const { return _expr_order; }
     void set_synced_size(uint64_t global_size);
     void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State 
state,
@@ -145,7 +143,6 @@ private:
 
     std::atomic<State> _rf_state;
     std::unique_ptr<RuntimeProfile> _profile;
-    std::vector<Callback> _callback;
 };
 
 } // namespace doris
diff --git a/be/src/runtime_filter/role/runtime_filter.cpp 
b/be/src/runtime_filter/role/runtime_filter.cpp
index 6289afd5d76..f5b902df7e2 100644
--- a/be/src/runtime_filter/role/runtime_filter.cpp
+++ b/be/src/runtime_filter/role/runtime_filter.cpp
@@ -103,7 +103,7 @@ Status RuntimeFilter::_init_with_desc(const 
TRuntimeFilterDesc* desc,
     params.enable_fixed_len_to_uint32_v2 = 
options->__isset.enable_fixed_len_to_uint32_v2 &&
                                            
options->enable_fixed_len_to_uint32_v2;
     if (_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER) {
-        if (_has_remote_target) {
+        if (!_has_local_target) {
             return Status::InternalError("bitmap filter do not support remote 
target");
         }
         if (!build_ctx->root()->type().is_bitmap_type()) {
diff --git a/be/src/runtime_filter/role/runtime_filter.h 
b/be/src/runtime_filter/role/runtime_filter.h
index 41623fbc74d..f4cb1f10301 100644
--- a/be/src/runtime_filter/role/runtime_filter.h
+++ b/be/src/runtime_filter/role/runtime_filter.h
@@ -44,8 +44,8 @@ public:
     bool has_local_target() const { return _has_local_target; }
 
     template <class T>
-    Status assign_data_into_wrapper(const T& request, 
butil::IOBufAsZeroCopyInputStream* data) {
-        return _wrapper->assign_data(request, data);
+    Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
+        return _wrapper->assign(request, data);
     }
 
     template <class T>
@@ -68,10 +68,8 @@ public:
             auto in_filter = request->mutable_in_filter();
             _to_protobuf(in_filter);
         } else if (real_runtime_filter_type == 
RuntimeFilterType::BLOOM_FILTER) {
-            _wrapper->get_bloom_filter_desc((char**)data, len);
             DCHECK(data != nullptr);
-            request->mutable_bloom_filter()->set_filter_length(*len);
-            request->mutable_bloom_filter()->set_always_true(false);
+            _to_protobuf(request->mutable_bloom_filter(), (char**)data, len);
         } else if (real_runtime_filter_type == 
RuntimeFilterType::MINMAX_FILTER ||
                    real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
                    real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
@@ -88,10 +86,9 @@ public:
 protected:
     RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* 
desc)
             : _state(state),
-              _has_remote_target(desc->has_remote_targets),
               _has_local_target(desc->has_local_targets),
               _runtime_filter_type(get_runtime_filter_type(desc)) {
-        DCHECK_NE(_has_remote_target, _has_local_target);
+        DCHECK_NE(desc->has_remote_targets, _has_local_target);
     }
 
     Status _init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options);
@@ -100,6 +97,9 @@ protected:
     void _to_protobuf(T* filter) {
         _wrapper->_to_protobuf(filter);
     }
+    void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length) {
+        _wrapper->_to_protobuf(filter, data, filter_length);
+    }
 
     Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
 
@@ -118,8 +118,6 @@ protected:
     // _wrapper is a runtime filter function wrapper
     std::shared_ptr<RuntimeFilterWrapper> _wrapper;
 
-    // will apply to remote node
-    bool _has_remote_target;
     // will apply to local node
     bool _has_local_target;
 
diff --git a/be/src/runtime_filter/runtime_filter_helper.cpp 
b/be/src/runtime_filter/runtime_filter_helper.cpp
index c2851ba7e71..85c718c74f6 100644
--- a/be/src/runtime_filter/runtime_filter_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_helper.cpp
@@ -24,12 +24,10 @@ namespace doris::pipeline {
 
 RuntimeFilterHelper::RuntimeFilterHelper(const int32_t _node_id,
                                          const 
std::vector<TRuntimeFilterDesc>& runtime_filters,
-                                         const RowDescriptor& row_descriptor,
-                                         vectorized::VExprContextSPtrs& 
conjuncts)
+                                         const RowDescriptor& row_descriptor)
         : _node_id(_node_id),
           _runtime_filter_descs(runtime_filters),
           _row_descriptor_ref(row_descriptor),
-          _conjuncts_ref(conjuncts),
           _profile(new RuntimeProfile("RuntimeFilterHelper")) {
     _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
 }
@@ -85,22 +83,23 @@ void RuntimeFilterHelper::init_runtime_filter_dependency(
     }
 }
 
-Status RuntimeFilterHelper::acquire_runtime_filter() {
+Status 
RuntimeFilterHelper::acquire_runtime_filter(vectorized::VExprContextSPtrs& 
conjuncts) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        RETURN_IF_ERROR(_consumers[i]->acquire_expr(_probe_ctxs, vexprs));
+        RETURN_IF_ERROR(_consumers[i]->acquire_expr(vexprs));
 
-        if (!_consumers[i]->applied()) {
+        if (!_consumers[i]->is_applied()) {
             _is_all_rf_applied = false;
         }
     }
-    RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+    RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs, conjuncts));
     return Status::OK();
 }
 
 Status RuntimeFilterHelper::_append_rf_into_conjuncts(
-        const std::vector<vectorized::VRuntimeFilterPtr>& vexprs) {
+        const std::vector<vectorized::VRuntimeFilterPtr>& vexprs,
+        vectorized::VExprContextSPtrs& conjuncts) {
     if (vexprs.empty()) {
         return Status::OK();
     }
@@ -109,13 +108,14 @@ Status RuntimeFilterHelper::_append_rf_into_conjuncts(
         vectorized::VExprContextSPtr conjunct = 
vectorized::VExprContext::create_shared(expr);
         RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
         RETURN_IF_ERROR(conjunct->open(_state));
-        _conjuncts_ref.emplace_back(conjunct);
+        conjuncts.emplace_back(conjunct);
     }
 
     return Status::OK();
 }
 
-Status RuntimeFilterHelper::try_append_late_arrival_runtime_filter(int* 
arrived_rf_num) {
+Status RuntimeFilterHelper::try_append_late_arrival_runtime_filter(
+        int* arrived_rf_num, vectorized::VExprContextSPtrs& conjuncts) {
     if (_is_all_rf_applied) {
         *arrived_rf_num = _runtime_filter_descs.size();
         return Status::OK();
@@ -133,12 +133,12 @@ Status 
RuntimeFilterHelper::try_append_late_arrival_runtime_filter(int* arrived_
     std::vector<vectorized::VRuntimeFilterPtr> exprs;
     int current_arrived_rf_num = 0;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        RETURN_IF_ERROR(_consumers[i]->acquire_expr(_probe_ctxs, exprs));
-        current_arrived_rf_num += _consumers[i]->applied();
+        RETURN_IF_ERROR(_consumers[i]->acquire_expr(exprs));
+        current_arrived_rf_num += _consumers[i]->is_applied();
     }
     // 2. Append unapplied runtime filters to _conjuncts
     if (!exprs.empty()) {
-        RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
+        RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs, conjuncts));
     }
     if (current_arrived_rf_num == _runtime_filter_descs.size()) {
         _is_all_rf_applied = true;
diff --git a/be/src/runtime_filter/runtime_filter_helper.h 
b/be/src/runtime_filter/runtime_filter_helper.h
index fa1a421de7b..c9fafd58b0b 100644
--- a/be/src/runtime_filter/runtime_filter_helper.h
+++ b/be/src/runtime_filter/runtime_filter_helper.h
@@ -28,32 +28,31 @@ class RuntimeFilterHelper {
 public:
     RuntimeFilterHelper(const int32_t node_id,
                         const std::vector<TRuntimeFilterDesc>& runtime_filters,
-                        const RowDescriptor& row_descriptor,
-                        vectorized::VExprContextSPtrs& conjuncts);
+                        const RowDescriptor& row_descriptor);
     ~RuntimeFilterHelper() = default;
 
     Status init(RuntimeState* state, RuntimeProfile* profile, bool 
need_local_merge);
-
-    // Try to append late arrived runtime filters.
-    // Return num of filters which are applied already.
-    Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
+    // Get all arrived runtime filters at Open phase which will be push down 
to storage.
+    // Called by Operator.
+    Status acquire_runtime_filter(vectorized::VExprContextSPtrs& conjuncts);
+    // The un-arrival filters will be checked every time the scanner is 
scheduled.
+    // And once new runtime filters arrived, we will use it to do operator's 
filtering.
+    // Called by Scanner.
+    Status try_append_late_arrival_runtime_filter(int* arrived_rf_num,
+                                                  
vectorized::VExprContextSPtrs& conjuncts);
 
     void init_runtime_filter_dependency(
             std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
                     runtime_filter_dependencies,
             const int id, const int node_id, const std::string& name);
 
-    // Get all arrived runtime filters at Open phase.
-    Status acquire_runtime_filter();
-
-    bool is_all_rf_applied() const { return _is_all_rf_applied; }
-
 private:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter(bool need_local_merge);
 
     // Append late-arrival runtime filters to the vconjunct_ctx.
-    Status _append_rf_into_conjuncts(const 
std::vector<vectorized::VRuntimeFilterPtr>& vexprs);
+    Status _append_rf_into_conjuncts(const 
std::vector<vectorized::VRuntimeFilterPtr>& vexprs,
+                                     vectorized::VExprContextSPtrs& conjuncts);
 
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> _consumers;
     std::mutex _rf_locks;
@@ -61,18 +60,13 @@ private:
 
     int32_t _node_id;
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    std::list<vectorized::VExprContextSPtr> _probe_ctxs;
-
     const RowDescriptor& _row_descriptor_ref;
 
-    vectorized::VExprContextSPtrs& _conjuncts_ref;
-
     // True means all runtime filters are applied to scanners
     bool _is_all_rf_applied = true;
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 
     RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
-
     std::unique_ptr<RuntimeProfile> _profile;
 };
 
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index ba222c22b94..d8ebeeacb9f 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -88,7 +88,7 @@ Status RuntimeFilterMgr::register_consumer_filter(
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::register_local_merger_filter(
+Status RuntimeFilterMgr::register_local_merger_producer_filter(
         const TRuntimeFilterDesc& desc, const TQueryOptions& options,
         std::shared_ptr<RuntimeFilterProducer> producer_filter) {
     DCHECK(_is_global);
@@ -137,15 +137,14 @@ Status RuntimeFilterMgr::register_producer_filter(
     DCHECK(!_is_global);
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
-    std::lock_guard<std::mutex> l(_lock);
-    auto iter = _producer_map.find(key);
-
     DCHECK(_state != nullptr);
-    if (iter != _producer_map.end()) {
-        return Status::InvalidArgument("filter has registed");
+
+    std::lock_guard<std::mutex> l(_lock);
+    if (_producer_id_set.contains(key)) {
+        return Status::InvalidArgument("filter {} has been registered", key);
     }
     RETURN_IF_ERROR(RuntimeFilterProducer::create(_state, &desc, 
producer_filter, parent_profile));
-    _producer_map.emplace(key, *producer_filter);
+    _producer_id_set.insert(key);
     return Status::OK();
 }
 
@@ -235,6 +234,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryC
     cnt_val.source_addrs.push_back(request->source_addr());
 
     Status st = Status::OK();
+    // After all runtime filters' size are collected, we should send response 
to all producers.
     if (cnt_val.merger->add_rf_size(request->filter_size())) {
         auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
                                                                    : query_ctx;
@@ -311,7 +311,7 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
         RETURN_IF_ERROR(RuntimeFilterProducer::create(_state, 
&cnt_val.runtime_filter_desc,
                                                       &tmp_filter, nullptr));
 
-        RETURN_IF_ERROR(tmp_filter->assign_data_into_wrapper(*request, 
attach_data));
+        RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
 
         RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
 
@@ -395,27 +395,6 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
     return st;
 }
 
-Status RuntimeFilterMergeController::acquire(
-        UniqueId query_id, 
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
-    uint32_t shard = _get_controller_shard_idx(query_id);
-    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    auto iter = _filter_controller_map[shard].find(query_id);
-    if (iter == _filter_controller_map[shard].end()) {
-        return Status::InvalidArgument("not found entity, query-id:{}", 
query_id.to_string());
-    }
-    *handle = _filter_controller_map[shard][query_id].lock();
-    if (*handle == nullptr) {
-        return Status::InvalidArgument("entity is closed");
-    }
-    return Status::OK();
-}
-
-void RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
-    uint32_t shard = _get_controller_shard_idx(query_id);
-    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    _filter_controller_map[shard].erase(query_id);
-}
-
 RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* 
state) {
     RuntimeFilterParamsContext* params =
             state->get_query_ctx()->obj_pool.add(new 
RuntimeFilterParamsContext());
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 507e6ad085b..e0b53409c59 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -81,38 +81,27 @@ public:
 
     ~RuntimeFilterMgr();
 
+    // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
-
-    std::shared_ptr<RuntimeFilterProducer> try_get_product_filter(const int 
filter_id) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto iter = _producer_map.find(filter_id);
-        if (iter == _producer_map.end()) {
-            return nullptr;
-        }
-        return iter->second;
-    }
-
-    // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     int node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter,
                                     bool need_local_merge, RuntimeProfile* 
parent_profile);
 
-    Status register_local_merger_filter(const TRuntimeFilterDesc& desc,
-                                        const TQueryOptions& options,
-                                        std::shared_ptr<RuntimeFilterProducer> 
producer_filter);
-
+    // get/set local-merge producer
+    Status register_local_merger_producer_filter(
+            const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+            std::shared_ptr<RuntimeFilterProducer> producer_filter);
     Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** 
local_merge_filters);
 
+    // Create local producer. This producer is hold by RuntimeFilterSlots.
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     std::shared_ptr<RuntimeFilterProducer>* 
producer_filter,
                                     RuntimeProfile* parent_profile);
 
     // update filter by remote
     void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
-
     Status get_merge_addr(TNetworkAddress* addr);
-
     Status sync_filter_size(const PSyncFilterSizeRequest* request);
 
 private:
@@ -132,7 +121,7 @@ private:
     // use filter_id as key
     // key: "filter-id"
     std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> 
_consumer_map;
-    std::map<int32_t, std::shared_ptr<RuntimeFilterProducer>> _producer_map;
+    std::set<int32_t> _producer_id_set;
     std::map<int32_t, LocalMergeContext> _local_merge_map;
 
     RuntimeFilterParamsContext* _state = nullptr;
@@ -182,68 +171,4 @@ private:
     RuntimeFilterParamsContext* _state = nullptr;
 };
 
-// RuntimeFilterMergeController has a map query-id -> entity
-class RuntimeFilterMergeController {
-public:
-    RuntimeFilterMergeController() = default;
-    ~RuntimeFilterMergeController() = default;
-
-    // thread safe
-    // add a query-id -> entity
-    // If a query-id -> entity already exists
-    // add_entity will return a exists entity
-    Status add_entity(const auto& params, UniqueId query_id, const 
TQueryOptions& query_options,
-                      std::shared_ptr<RuntimeFilterMergeControllerEntity>* 
handle,
-                      RuntimeFilterParamsContext* state) {
-        if (!params.__isset.runtime_filter_params ||
-            params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
-            return Status::OK();
-        }
-
-        // TODO: why we need string, direct use UniqueId
-        uint32_t shard = _get_controller_shard_idx(query_id);
-        std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-        auto iter = _filter_controller_map[shard].find(query_id);
-        if (iter == _filter_controller_map[shard].end()) {
-            *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
-                    new RuntimeFilterMergeControllerEntity(state),
-                    [this](RuntimeFilterMergeControllerEntity* entity) {
-                        remove_entity(entity->query_id());
-                        delete entity;
-                    });
-            _filter_controller_map[shard][query_id] = *handle;
-            const TRuntimeFilterParams& filter_params = 
params.runtime_filter_params;
-            RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, 
query_options));
-        } else {
-            *handle = _filter_controller_map[shard][query_id].lock();
-        }
-        return Status::OK();
-    }
-
-    // thread safe
-    // increase a reference count
-    // if a query-id is not exist
-    // Status.not_ok will be returned and a empty ptr will returned by *handle
-    Status acquire(UniqueId query_id, 
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle);
-
-    // thread safe
-    // remove a entity by query-id
-    // remove_entity will be called automatically by entity when entity is 
destroyed
-    void remove_entity(UniqueId query_id);
-
-    static const int kShardNum = 128;
-
-private:
-    uint32_t _get_controller_shard_idx(UniqueId& query_id) {
-        return (uint32_t)query_id.hi % kShardNum;
-    }
-
-    std::mutex _controller_mutex[kShardNum];
-    // We store the weak pointer here.
-    // When the external object is destroyed, we need to clear this record
-    using FilterControllerMap =
-            std::unordered_map<UniqueId, 
std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
-    // str(query-id) -> entity
-    FilterControllerMap _filter_controller_map[kShardNum];
-};
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_slots.cpp 
b/be/src/runtime_filter/runtime_filter_slots.cpp
index 5031c59a04b..26fb7eca46f 100644
--- a/be/src/runtime_filter/runtime_filter_slots.cpp
+++ b/be/src/runtime_filter/runtime_filter_slots.cpp
@@ -27,27 +27,9 @@ namespace doris {
 Status RuntimeFilterSlots::init(RuntimeState* state,
                                 const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs) {
     _runtime_filters.resize(runtime_filter_descs.size());
-    std::unordered_map<int, RuntimeFilterProducer*> id_to_in_filter;
     for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->register_producer_runtime_filter(
                 runtime_filter_descs[i], &_runtime_filters[i], 
_profile.get()));
-        if (runtime_filter_descs[i].type == TRuntimeFilterType::IN) {
-            id_to_in_filter.insert({runtime_filter_descs[i].expr_order, 
_runtime_filters[i].get()});
-        } else if (runtime_filter_descs[i].type == 
TRuntimeFilterType::IN_OR_BLOOM &&
-                   
!id_to_in_filter.contains(runtime_filter_descs[i].expr_order)) {
-            id_to_in_filter.insert({runtime_filter_descs[i].expr_order, 
_runtime_filters[i].get()});
-        }
-    }
-    for (size_t i = 0; i < runtime_filter_descs.size(); i++) {
-        if (id_to_in_filter.contains(_runtime_filters[i]->expr_order()) &&
-            _runtime_filters[i].get() != 
id_to_in_filter[_runtime_filters[i]->expr_order()]) {
-            RuntimeFilterProducer::Callback callback =
-                    [&, filter = _runtime_filters[i].get()]() -> void {
-                filter->set_wrapper_state_and_ready_to_publish(
-                        RuntimeFilterWrapper::State::DISABLED, "exist 
in_filter");
-            };
-            
id_to_in_filter[_runtime_filters[i]->expr_order()]->with_callback(callback);
-        }
     }
     return Status::OK();
 }
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index f688af21a4c..c81e04758f2 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -18,17 +18,12 @@
 #include "runtime_filter/runtime_filter_wrapper.h"
 
 #include "exprs/create_predicate_function.h"
-#include "vec/exprs/vbitmap_predicate.h"
-#include "vec/exprs/vbloom_predicate.h"
-#include "vec/exprs/vdirect_in_predicate.h"
-#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 
 RuntimeFilterWrapper::RuntimeFilterWrapper(const RuntimeFilterParams* params)
         : RuntimeFilterWrapper(params->column_return_type, 
params->filter_type, params->filter_id,
-                               State::UNINITED) {
-    _max_in_num = params->max_in_num;
+                               State::UNINITED, params->max_in_num) {
     switch (_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         _hybrid_set.reset(create_set(_column_return_type));
@@ -75,150 +70,14 @@ RuntimeFilterWrapper::RuntimeFilterWrapper(const 
RuntimeFilterParams* params)
     }
 }
 
-Status 
RuntimeFilterWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
-                                            
std::vector<vectorized::VRuntimeFilterPtr>& container,
-                                            const TExpr& probe_expr) {
-    vectorized::VExprContextSPtr probe_ctx;
-    RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
probe_ctx));
-    probe_ctxs.push_back(probe_ctx);
-    DCHECK(probe_ctx->root()->type().type == _column_return_type ||
-           (is_string_type(probe_ctx->root()->type().type) &&
-            is_string_type(_column_return_type)) ||
-           _filter_type == RuntimeFilterType::BITMAP_FILTER)
-            << " prob_expr->root()->type().type: " << 
int(probe_ctx->root()->type().type)
-            << " _column_return_type: " << int(_column_return_type)
-            << " _filter_type: " << filter_type_to_string(_filter_type);
-
-    auto real_filter_type = get_real_type();
-    bool null_aware = contain_null();
-    switch (real_filter_type) {
-    case RuntimeFilterType::IN_FILTER: {
-        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
-        type_desc.__set_is_nullable(false);
-        TExprNode node;
-        node.__set_type(type_desc);
-        node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED
-                                        : TExprNodeType::IN_PRED);
-        node.in_predicate.__set_is_not_in(false);
-        node.__set_opcode(TExprOpcode::FILTER_IN);
-        node.__set_is_nullable(false);
-        auto in_pred = vectorized::VDirectInPredicate::create_shared(node, 
_hybrid_set);
-        in_pred->add_child(probe_ctx->root());
-        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
-                node, in_pred, 
get_in_list_ignore_thredhold(_hybrid_set->size()), null_aware);
-        container.push_back(wrapper);
-        break;
-    }
-    case RuntimeFilterType::MIN_FILTER: {
-        // create min filter
-        vectorized::VExprSPtr min_pred;
-        TExprNode min_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::GE, min_pred,
-                                              &min_pred_node, null_aware));
-        vectorized::VExprSPtr min_literal;
-        RETURN_IF_ERROR(
-                create_literal(probe_ctx->root()->type(), 
_minmax_func->get_min(), min_literal));
-        min_pred->add_child(probe_ctx->root());
-        min_pred->add_child(min_literal);
-        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
-                min_pred_node, min_pred, get_comparison_ignore_thredhold()));
-        break;
-    }
-    case RuntimeFilterType::MAX_FILTER: {
-        vectorized::VExprSPtr max_pred;
-        // create max filter
-        TExprNode max_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
-                                              &max_pred_node, null_aware));
-        vectorized::VExprSPtr max_literal;
-        RETURN_IF_ERROR(
-                create_literal(probe_ctx->root()->type(), 
_minmax_func->get_max(), max_literal));
-        max_pred->add_child(probe_ctx->root());
-        max_pred->add_child(max_literal);
-        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
-                max_pred_node, max_pred, get_comparison_ignore_thredhold()));
-        break;
-    }
-    case RuntimeFilterType::MINMAX_FILTER: {
-        vectorized::VExprSPtr max_pred;
-        // create max filter
-        TExprNode max_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
-                                              &max_pred_node, null_aware));
-        vectorized::VExprSPtr max_literal;
-        RETURN_IF_ERROR(
-                create_literal(probe_ctx->root()->type(), 
_minmax_func->get_max(), max_literal));
-        max_pred->add_child(probe_ctx->root());
-        max_pred->add_child(max_literal);
-        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
-                max_pred_node, max_pred, get_comparison_ignore_thredhold(), 
null_aware));
-
-        vectorized::VExprContextSPtr new_probe_ctx;
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
-        probe_ctxs.push_back(new_probe_ctx);
-
-        // create min filter
-        vectorized::VExprSPtr min_pred;
-        TExprNode min_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), 
TExprOpcode::GE,
-                                              min_pred, &min_pred_node, 
null_aware));
-        vectorized::VExprSPtr min_literal;
-        RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(), 
_minmax_func->get_min(),
-                                       min_literal));
-        min_pred->add_child(new_probe_ctx->root());
-        min_pred->add_child(min_literal);
-        container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
-                min_pred_node, min_pred, get_comparison_ignore_thredhold(), 
null_aware));
-        break;
-    }
-    case RuntimeFilterType::BLOOM_FILTER: {
-        // create a bloom filter
-        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
-        type_desc.__set_is_nullable(false);
-        TExprNode node;
-        node.__set_type(type_desc);
-        node.__set_node_type(TExprNodeType::BLOOM_PRED);
-        node.__set_opcode(TExprOpcode::RT_FILTER);
-        node.__set_is_nullable(false);
-        auto bloom_pred = vectorized::VBloomPredicate::create_shared(node);
-        bloom_pred->set_filter(_bloom_filter_func);
-        bloom_pred->add_child(probe_ctx->root());
-        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
-                node, bloom_pred, get_bloom_filter_ignore_thredhold());
-        container.push_back(wrapper);
-        break;
-    }
-    case RuntimeFilterType::BITMAP_FILTER: {
-        // create a bitmap filter
-        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
-        type_desc.__set_is_nullable(false);
-        TExprNode node;
-        node.__set_type(type_desc);
-        node.__set_node_type(TExprNodeType::BITMAP_PRED);
-        node.__set_opcode(TExprOpcode::RT_FILTER);
-        node.__set_is_nullable(false);
-        auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
-        bitmap_pred->set_filter(_bitmap_filter_func);
-        bitmap_pred->add_child(probe_ctx->root());
-        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bitmap_pred, 0);
-        container.push_back(wrapper);
-        break;
-    }
-    default:
-        DCHECK(false);
-        break;
-    }
-    return Status::OK();
-}
-
-Status RuntimeFilterWrapper::change_to_bloom_filter() {
+Status RuntimeFilterWrapper::_change_to_bloom_filter() {
     if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
         return Status::InternalError("Can not change to bloom filter, {}", 
debug_string());
     }
     BloomFilterFuncBase* bf = _bloom_filter_func.get();
 
     if (bf != nullptr) {
-        insert_to_bloom_filter(bf);
+        _insert(bf);
     } else if (_hybrid_set != nullptr && _hybrid_set->size() != 0) {
         return Status::InternalError("change to bloom filter need empty set, 
{}", debug_string());
     }
@@ -228,7 +87,7 @@ Status RuntimeFilterWrapper::change_to_bloom_filter() {
     return Status::OK();
 }
 
-void RuntimeFilterWrapper::batch_assign(
+void RuntimeFilterWrapper::_batch_assign(
         const PInFilter& filter,
         void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set, 
PColumnValue&)) {
     for (int i = 0; i < filter.values_size(); ++i) {
@@ -237,16 +96,20 @@ void RuntimeFilterWrapper::batch_assign(
     }
 }
 
-Status RuntimeFilterWrapper::init_bloom_filter(const size_t runtime_size) {
-    if (_filter_type != RuntimeFilterType::BLOOM_FILTER &&
-        _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "init_bloom_filter meet 
invalid input type {}",
-                        int(_filter_type));
+Status RuntimeFilterWrapper::init(const size_t real_size) {
+    if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && real_size > 
_max_in_num) {
+        RETURN_IF_ERROR(_change_to_bloom_filter());
+    }
+    if (get_real_type() == RuntimeFilterType::IN_FILTER && real_size > 
_max_in_num) {
+        set_state(RuntimeFilterWrapper::State::DISABLED, "reach max in num");
     }
-    return _bloom_filter_func->init_with_runtime_size(runtime_size);
+    if (_bloom_filter_func) {
+        RETURN_IF_ERROR(_bloom_filter_func->init_with_runtime_size(real_size));
+    }
+    return Status::OK();
 }
 
-void RuntimeFilterWrapper::insert_to_bloom_filter(BloomFilterFuncBase* 
bloom_filter) const {
+void RuntimeFilterWrapper::_insert(BloomFilterFuncBase* bloom_filter) const {
     if (_hybrid_set->size() > 0) {
         auto* it = _hybrid_set->begin();
         while (it->has_next()) {
@@ -259,7 +122,7 @@ void 
RuntimeFilterWrapper::insert_to_bloom_filter(BloomFilterFuncBase* bloom_fil
     }
 }
 
-void RuntimeFilterWrapper::insert_fixed_len(const vectorized::ColumnPtr& 
column, size_t start) {
+void RuntimeFilterWrapper::insert(const vectorized::ColumnPtr& column, size_t 
start) {
     switch (_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         _hybrid_set->insert_fixed_len(column, start);
@@ -283,36 +146,35 @@ void RuntimeFilterWrapper::insert_fixed_len(const 
vectorized::ColumnPtr& column,
         }
         break;
     }
+    case RuntimeFilterType::BITMAP_FILTER: {
+        std::vector<const BitmapValue*> bitmaps;
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnBitmap&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bitmaps.push_back(&(col.get_data()[i]));
+                }
+            }
+        } else {
+            const auto* col = assert_cast<const 
vectorized::ColumnBitmap*>(column.get());
+            for (size_t i = start; i < column->size(); i++) {
+                bitmaps.push_back(&(col->get_data()[i]));
+            }
+        }
+        _bitmap_filter_func->insert_many(bitmaps);
+        break;
+    }
     default:
         DCHECK(false);
         break;
     }
 }
 
-void RuntimeFilterWrapper::bitmap_filter_insert_batch(const 
vectorized::ColumnPtr column,
-                                                      size_t start) {
-    std::vector<const BitmapValue*> bitmaps;
-    if (column->is_nullable()) {
-        const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
-        const auto& col =
-                assert_cast<const 
vectorized::ColumnBitmap&>(nullable->get_nested_column());
-        const auto& nullmap =
-                assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
-                        .get_data();
-        for (size_t i = start; i < column->size(); i++) {
-            if (!nullmap[i]) {
-                bitmaps.push_back(&(col.get_data()[i]));
-            }
-        }
-    } else {
-        const auto* col = assert_cast<const 
vectorized::ColumnBitmap*>(column.get());
-        for (size_t i = start; i < column->size(); i++) {
-            bitmaps.push_back(&(col->get_data()[i]));
-        }
-    }
-    _bitmap_filter_func->insert_many(bitmaps);
-}
-
 bool RuntimeFilterWrapper::build_bf_by_runtime_size() const {
     return _bloom_filter_func ? _bloom_filter_func->build_bf_by_runtime_size() 
: false;
 }
@@ -367,17 +229,17 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
                 if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
                     // case2: use default size to init bf
                     
RETURN_IF_ERROR(_bloom_filter_func->init_with_fixed_length());
-                    RETURN_IF_ERROR(change_to_bloom_filter());
+                    RETURN_IF_ERROR(_change_to_bloom_filter());
                 }
             } else {
                 // case1&case2: use input bf directly and insert hybrid set 
data into bf
                 _bloom_filter_func = other->_bloom_filter_func;
-                RETURN_IF_ERROR(change_to_bloom_filter());
+                RETURN_IF_ERROR(_change_to_bloom_filter());
             }
         } else {
             if (other_filter_type == RuntimeFilterType::IN_FILTER) {
                 // case2: insert data to global filter
-                other->insert_to_bloom_filter(_bloom_filter_func.get());
+                other->_insert(_bloom_filter_func.get());
             } else {
                 // case1&case2: all input bf must has same size
                 
RETURN_IF_ERROR(_bloom_filter_func->merge(other->_bloom_filter_func.get()));
@@ -396,7 +258,7 @@ Status RuntimeFilterWrapper::merge(const 
RuntimeFilterWrapper* other) {
     return Status::OK();
 }
 
-Status RuntimeFilterWrapper::assign(const PInFilter& in_filter, bool 
contain_null) {
+Status RuntimeFilterWrapper::_assign(const PInFilter& in_filter, bool 
contain_null) {
     if (contain_null) {
         _hybrid_set->set_null_aware(true);
         _hybrid_set->insert((const void*)nullptr);
@@ -404,42 +266,42 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
 
     switch (_column_return_type) {
     case TYPE_BOOLEAN: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             bool bool_val = column.boolval();
             set->insert(&bool_val);
         });
         break;
     }
     case TYPE_TINYINT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto int_val = static_cast<int8_t>(column.intval());
             set->insert(&int_val);
         });
         break;
     }
     case TYPE_SMALLINT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto int_val = static_cast<int16_t>(column.intval());
             set->insert(&int_val);
         });
         break;
     }
     case TYPE_INT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             int32_t int_val = column.intval();
             set->insert(&int_val);
         });
         break;
     }
     case TYPE_BIGINT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             int64_t long_val = column.longval();
             set->insert(&long_val);
         });
         break;
     }
     case TYPE_LARGEINT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto string_val = column.stringval();
             StringParser::ParseResult result;
             auto int128_val = 
StringParser::string_to_int<int128_t>(string_val.c_str(),
@@ -450,28 +312,28 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
         break;
     }
     case TYPE_FLOAT: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto float_val = static_cast<float>(column.doubleval());
             set->insert(&float_val);
         });
         break;
     }
     case TYPE_DOUBLE: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             double double_val = column.doubleval();
             set->insert(&double_val);
         });
         break;
     }
     case TYPE_DATEV2: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto date_v2_val = column.intval();
             set->insert(&date_v2_val);
         });
         break;
     }
     case TYPE_DATETIMEV2: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto date_v2_val = column.longval();
             set->insert(&date_v2_val);
         });
@@ -479,7 +341,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
     }
     case TYPE_DATETIME:
     case TYPE_DATE: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             const auto& string_val_ref = column.stringval();
             VecDateTimeValue datetime_val;
             datetime_val.from_date_str(string_val_ref.c_str(), 
string_val_ref.length());
@@ -488,7 +350,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
         break;
     }
     case TYPE_DECIMALV2: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             const auto& string_val_ref = column.stringval();
             DecimalV2Value decimal_val(string_val_ref);
             set->insert(&decimal_val);
@@ -496,21 +358,21 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
         break;
     }
     case TYPE_DECIMAL32: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             int32_t decimal_32_val = column.intval();
             set->insert(&decimal_32_val);
         });
         break;
     }
     case TYPE_DECIMAL64: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             int64_t decimal_64_val = column.longval();
             set->insert(&decimal_64_val);
         });
         break;
     }
     case TYPE_DECIMAL128I: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto string_val = column.stringval();
             StringParser::ParseResult result;
             auto int128_val = 
StringParser::string_to_int<int128_t>(string_val.c_str(),
@@ -521,7 +383,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
         break;
     }
     case TYPE_DECIMAL256: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto string_val = column.stringval();
             StringParser::ParseResult result;
             auto int_val = 
StringParser::string_to_int<wide::Int256>(string_val.c_str(),
@@ -534,7 +396,7 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
     case TYPE_VARCHAR:
     case TYPE_CHAR:
     case TYPE_STRING: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             const std::string& string_value = column.stringval();
             // string_value is std::string, call insert(data, size) function 
in StringSet will not cast as StringRef
             // so could avoid some cast error at different class object.
@@ -543,14 +405,14 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
         break;
     }
     case TYPE_IPV4: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             int32_t tmp = column.intval();
             set->insert(&tmp);
         });
         break;
     }
     case TYPE_IPV6: {
-        batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
+        _batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, 
PColumnValue& column) {
             auto string_val = column.stringval();
             StringParser::ParseResult result;
             auto int128_val = 
StringParser::string_to_int<uint128_t>(string_val.c_str(),
@@ -568,15 +430,13 @@ Status RuntimeFilterWrapper::assign(const PInFilter& 
in_filter, bool contain_nul
     return Status::OK();
 }
 
-Status RuntimeFilterWrapper::assign(const PBloomFilter& bloom_filter,
-                                    butil::IOBufAsZeroCopyInputStream* data, 
bool contain_null) {
+Status RuntimeFilterWrapper::_assign(const PBloomFilter& bloom_filter,
+                                     butil::IOBufAsZeroCopyInputStream* data, 
bool contain_null) {
     RETURN_IF_ERROR(_bloom_filter_func->assign(data, 
bloom_filter.filter_length(), contain_null));
     return Status::OK();
 }
 
-// used by shuffle runtime filter
-// assign this filter by protobuf
-Status RuntimeFilterWrapper::assign(const PMinMaxFilter& minmax_filter, bool 
contain_null) {
+Status RuntimeFilterWrapper::_assign(const PMinMaxFilter& minmax_filter, bool 
contain_null) {
     if (contain_null) {
         _minmax_func->set_null_aware(true);
         _minmax_func->set_contain_null();
@@ -721,10 +581,6 @@ Status RuntimeFilterWrapper::assign(const PMinMaxFilter& 
minmax_filter, bool con
     return Status::InternalError("not support!");
 }
 
-void RuntimeFilterWrapper::get_bloom_filter_desc(char** data, int* 
filter_length) {
-    _bloom_filter_func->get_data(data, filter_length);
-}
-
 bool RuntimeFilterWrapper::contain_null() const {
     if (get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
         return _bloom_filter_func->contain_null();
@@ -779,4 +635,10 @@ void RuntimeFilterWrapper::_to_protobuf(PMinMaxFilter* 
filter) {
     _minmax_func->to_pb(filter);
 }
 
+void RuntimeFilterWrapper::_to_protobuf(PBloomFilter* filter, char** data, 
int* filter_length) {
+    _bloom_filter_func->get_data(data, filter_length);
+    filter->set_filter_length(*filter_length);
+    filter->set_always_true(false);
+}
+
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index b41e72c7cea..a88f1baf869 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -37,78 +37,18 @@ public:
 
     RuntimeFilterWrapper(const RuntimeFilterParams* params);
     RuntimeFilterWrapper(PrimitiveType column_type, RuntimeFilterType type, 
uint32_t filter_id,
-                         State state)
+                         State state, int max_in_num = 0)
             : _column_return_type(column_type),
               _filter_type(type),
               _filter_id(filter_id),
-              _state(state) {}
-
-    Status change_to_bloom_filter();
-
-    bool is_valid() const { return _state != State::DISABLED && _state != 
State::IGNORED; }
-    int filter_id() const { return _filter_id; }
-
-    int max_in_num() const { return _max_in_num; }
-
-    bool build_bf_by_runtime_size() const;
-
-    Status init_bloom_filter(const size_t runtime_size);
-
-    void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const;
-
-    void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start);
-
-    void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
-        if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
-            bitmap_filter_insert_batch(column, start);
-        } else {
-            insert_fixed_len(column, start);
-        }
-    }
-
-    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t 
start);
-
-    RuntimeFilterType get_real_type() const {
-        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            if (_hybrid_set) {
-                return RuntimeFilterType::IN_FILTER;
-            }
-            return RuntimeFilterType::BLOOM_FILTER;
-        }
-        return _filter_type;
-    }
-
-    Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
-                          std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
-                          const TExpr& probe_expr);
+              _state(state),
+              _max_in_num(max_in_num) {}
 
+    Status init(const size_t runtime_size);
+    void insert(const vectorized::ColumnPtr& column, size_t start);
     Status merge(const RuntimeFilterWrapper* wrapper);
-
-    Status assign(const PInFilter& in_filter, bool contain_null);
-
-    // used by shuffle runtime filter
-    // assign this filter by protobuf
-    Status assign(const PBloomFilter& bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
-                  bool contain_null);
-
-    // used by shuffle runtime filter
-    // assign this filter by protobuf
-    Status assign(const PMinMaxFilter& minmax_filter, bool contain_null);
-
-    void get_bloom_filter_desc(char** data, int* filter_length);
-
-    PrimitiveType column_type() { return _column_return_type; }
-
-    bool contain_null() const;
-
-    void batch_assign(const PInFilter& filter,
-                      void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
-                                          PColumnValue&));
-
-    friend class RuntimeFilter;
-
     template <class T>
-    Status assign_data(const T& request, butil::IOBufAsZeroCopyInputStream* 
data) {
+    Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
         PFilterType filter_type = request.filter_type();
 
         if (request.has_disabled() && request.disabled()) {
@@ -126,24 +66,47 @@ public:
         switch (filter_type) {
         case PFilterType::IN_FILTER: {
             DCHECK(request.has_in_filter());
-            return assign(request.in_filter(), request.contain_null());
+            return _assign(request.in_filter(), request.contain_null());
         }
         case PFilterType::BLOOM_FILTER: {
             DCHECK(request.has_bloom_filter());
             _hybrid_set.reset(); // change in_or_bloom filter to bloom filter
-            return assign(request.bloom_filter(), data, 
request.contain_null());
+            return _assign(request.bloom_filter(), data, 
request.contain_null());
         }
         case PFilterType::MIN_FILTER:
         case PFilterType::MAX_FILTER:
         case PFilterType::MINMAX_FILTER: {
             DCHECK(request.has_minmax_filter());
-            return assign(request.minmax_filter(), request.contain_null());
+            return _assign(request.minmax_filter(), request.contain_null());
         }
         default:
             return Status::InternalError("unknown filter type {}", 
int(filter_type));
         }
     }
 
+    bool is_valid() const { return _state != State::DISABLED && _state != 
State::IGNORED; }
+    int filter_id() const { return _filter_id; }
+    bool build_bf_by_runtime_size() const;
+
+    RuntimeFilterType get_real_type() const {
+        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            if (_hybrid_set) {
+                return RuntimeFilterType::IN_FILTER;
+            }
+            return RuntimeFilterType::BLOOM_FILTER;
+        }
+        return _filter_type;
+    }
+
+    std::shared_ptr<MinMaxFuncBase> minmax_func() const { return _minmax_func; 
}
+    std::shared_ptr<HybridSetBase> hybrid_set() const { return _hybrid_set; }
+    std::shared_ptr<BloomFilterFuncBase> bloom_filter_func() const { return 
_bloom_filter_func; }
+    std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func() const { return 
_bitmap_filter_func; }
+
+    PrimitiveType column_type() const { return _column_return_type; }
+
+    bool contain_null() const;
+
     std::string debug_string() const;
 
     void set_state(State state, std::string reason = "") {
@@ -154,11 +117,8 @@ public:
         }
         _state = state;
     }
-
     void disable(std::string reason) { set_state(State::DISABLED, reason); }
-
     State get_state() const { return _state; }
-
     void check_state(std::vector<State> assumed_states) const {
         if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -166,7 +126,6 @@ public:
                             
states_to_string<RuntimeFilterWrapper>(assumed_states));
         }
     }
-
     static std::string to_string(const State& state) {
         switch (state) {
         case State::IGNORED:
@@ -182,22 +141,33 @@ public:
         }
     }
 
+private:
+    friend class RuntimeFilter;
+    void _insert(BloomFilterFuncBase* bloom_filter) const;
+    // used by shuffle runtime filter
+    // assign this filter by protobuf
+    Status _assign(const PInFilter& in_filter, bool contain_null);
+    Status _assign(const PBloomFilter& bloom_filter, 
butil::IOBufAsZeroCopyInputStream* data,
+                   bool contain_null);
+    Status _assign(const PMinMaxFilter& minmax_filter, bool contain_null);
+    void _batch_assign(const PInFilter& filter,
+                       void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
+                                           PColumnValue&));
     void _to_protobuf(PInFilter* filter);
-
     void _to_protobuf(PMinMaxFilter* filter);
-
-private:
+    void _to_protobuf(PBloomFilter* filter, char** data, int* filter_length);
+    Status _change_to_bloom_filter();
     // When a runtime filter received from remote and it is a bloom filter, 
_column_return_type will be invalid.
-    PrimitiveType _column_return_type; // column type
-    RuntimeFilterType _filter_type;
-    int32_t _max_in_num;
-    uint32_t _filter_id;
+    const PrimitiveType _column_return_type; // column type
+    const RuntimeFilterType _filter_type;
+    const uint32_t _filter_id;
+    std::atomic<State> _state;
+    const int32_t _max_in_num;
 
     std::shared_ptr<MinMaxFuncBase> _minmax_func;
     std::shared_ptr<HybridSetBase> _hybrid_set;
     std::shared_ptr<BloomFilterFuncBase> _bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func;
-    std::atomic<State> _state;
     std::string _disabled_reason;
 };
 
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 31616cf0a57..5c850be40ad 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -222,7 +222,8 @@ Status VScanner::try_append_late_arrival_runtime_filter() {
     DCHECK(_applied_rf_num < _total_rf_num);
 
     int arrived_rf_num = 0;
-    
RETURN_IF_ERROR(_local_state->_helper.try_append_late_arrival_runtime_filter(&arrived_rf_num));
+    
RETURN_IF_ERROR(_local_state->_helper.try_append_late_arrival_runtime_filter(
+            &arrived_rf_num, _local_state->_conjuncts));
 
     if (arrived_rf_num == _applied_rf_num) {
         // No newly arrived runtime filters, just return;
diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp 
b/be/src/vec/exprs/vbitmap_predicate.cpp
index 4e6f28950df..b09dfea4c3c 100644
--- a/be/src/vec/exprs/vbitmap_predicate.cpp
+++ b/be/src/vec/exprs/vbitmap_predicate.cpp
@@ -127,7 +127,7 @@ const std::string& 
vectorized::VBitmapPredicate::expr_name() const {
     return _expr_name;
 }
 
-void 
vectorized::VBitmapPredicate::set_filter(std::shared_ptr<BitmapFilterFuncBase>& 
filter) {
+void 
vectorized::VBitmapPredicate::set_filter(std::shared_ptr<BitmapFilterFuncBase> 
filter) {
     _filter = filter;
 }
 
diff --git a/be/src/vec/exprs/vbitmap_predicate.h 
b/be/src/vec/exprs/vbitmap_predicate.h
index bdb3ea2b00e..c37dfc69a3a 100644
--- a/be/src/vec/exprs/vbitmap_predicate.h
+++ b/be/src/vec/exprs/vbitmap_predicate.h
@@ -60,7 +60,7 @@ public:
 
     const std::string& expr_name() const override;
 
-    void set_filter(std::shared_ptr<BitmapFilterFuncBase>& filter);
+    void set_filter(std::shared_ptr<BitmapFilterFuncBase> filter);
 
     std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter_func() const 
override {
         return _filter;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index 139547ec96f..45aeca684ed 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -105,7 +105,8 @@ Status VBloomPredicate::execute(VExprContext* context, 
Block* block, int* result
 const std::string& VBloomPredicate::expr_name() const {
     return _expr_name;
 }
-void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase>& filter) 
{
+
+void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase> filter) {
     _filter = filter;
 }
 
diff --git a/be/src/vec/exprs/vbloom_predicate.h 
b/be/src/vec/exprs/vbloom_predicate.h
index 9ca5fdd0925..f5756e266da 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -49,7 +49,7 @@ public:
                 FunctionContext::FunctionStateScope scope) override;
     void close(VExprContext* context, FunctionContext::FunctionStateScope 
scope) override;
     const std::string& expr_name() const override;
-    void set_filter(std::shared_ptr<BloomFilterFuncBase>& filter);
+    void set_filter(std::shared_ptr<BloomFilterFuncBase> filter);
 
     std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const 
override { return _filter; }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index d2e47844aa8..46b94860701 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -614,7 +614,7 @@ message PPublishFilterRequestV2 {
     optional int64 merge_time = 9;
     optional bool contain_null = 10;
     optional bool ignored = 11;
-    repeated int32 fragment_ids = 12;
+    repeated int32 fragment_ids = 12; // deprecated
     optional uint64 local_merge_time = 13;
     optional bool disabled = 14;
 };
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 43ada0bdae3..88acfed96db 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1267,7 +1267,7 @@ struct TRuntimeFilterDesc {
 
   // Indicates if there is at least one target scan node that is not in the 
same
   // fragment as the broadcast join that produced the runtime filter
-  7: required bool has_remote_targets
+  7: required bool has_remote_targets // deprecated
 
   // The type of runtime filter to build.
   8: required TRuntimeFilterType type


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to