This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6d7d1976064c4fe8aa2ce3926b07fe965fe78977
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Feb 8 09:59:29 2024 +0800

    opt the rf code and remove rf unless code  (#30861)
---
 be/src/exprs/runtime_filter.cpp             | 110 +++++++++++++---------------
 be/src/exprs/runtime_filter.h               |  20 +----
 be/src/exprs/runtime_filter_slots.h         |   5 +-
 be/src/pipeline/exec/datagen_operator.cpp   |   8 +-
 be/src/runtime/fragment_mgr.cpp             |   8 +-
 be/src/runtime/runtime_filter_mgr.cpp       |  69 +++++++----------
 be/src/runtime/runtime_filter_mgr.h         |  10 +--
 be/src/vec/exec/runtime_filter_consumer.cpp |  15 ++--
 be/src/vec/exec/vdata_gen_scan_node.cpp     |   8 +-
 9 files changed, 97 insertions(+), 156 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 06d1c452fdd..47157cf74d2 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -366,11 +366,9 @@ public:
     BloomFilterFuncBase* get_bloomfilter() const { return 
_context.bloom_filter_func.get(); }
 
     void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
+        DCHECK(!is_ignored());
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            if (_is_ignored_in_filter) {
-                break;
-            }
             _context.hybrid_set->insert_fixed_len(column, start);
             break;
         }
@@ -465,14 +463,15 @@ public:
 
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            if (_is_ignored_in_filter) {
+            // only in filter can set ignore in merge time
+            if (_ignored) {
                 break;
-            } else if (wrapper->_is_ignored_in_filter) {
+            } else if (wrapper->_ignored) {
                 VLOG_DEBUG << " ignore merge runtime filter(in filter id " << 
_filter_id
-                           << ") because: " << 
wrapper->get_ignored_in_filter_msg();
+                           << ") because: " << wrapper->ignored_msg();
 
-                _is_ignored_in_filter = true;
-                _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
+                _ignored = true;
+                _ignored_msg = wrapper->_ignored_msg;
                 // release in filter
                 _context.hybrid_set.reset();
                 break;
@@ -480,17 +479,11 @@ public:
             // try insert set
             _context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
             if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
-#ifdef VLOG_DEBUG_IS_ON
-                std::stringstream msg;
-                msg << " ignore merge runtime filter(in filter id " << 
_filter_id
-                    << ") because: in_num(" << _context.hybrid_set->size() << 
") >= max_in_num("
-                    << _max_in_num << ")";
-                _ignored_in_filter_msg = std::string(msg.str());
-#else
-                _ignored_in_filter_msg = std::string("ignored");
-#endif
-                _is_ignored_in_filter = true;
-
+                _ignored_msg = fmt::format(
+                        " ignore merge runtime filter(in filter id {})"
+                        "because: in_num({}) >= max_in_num({})",
+                        _filter_id, _context.hybrid_set->size(), _max_in_num);
+                _ignored = true;
                 // release in filter
                 _context.hybrid_set.reset();
             }
@@ -520,10 +513,10 @@ public:
 
             if (real_filter_type == RuntimeFilterType::IN_FILTER) {
                 if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in 
merge in
-                    CHECK(!wrapper->_is_ignored_in_filter)
+                    CHECK(!wrapper->_ignored)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << wrapper->get_ignored_in_filter_msg();
+                            << wrapper->ignored_msg();
                     
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
                     if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
                         VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
@@ -540,10 +533,10 @@ public:
                 }
             } else {
                 if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
bloom filter merge in
-                    CHECK(!wrapper->_is_ignored_in_filter)
+                    CHECK(!wrapper->_ignored)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << wrapper->get_ignored_in_filter_msg();
+                            << wrapper->ignored_msg();
                     
wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get());
                     // bloom filter merge bloom filter
                 } else {
@@ -563,8 +556,8 @@ public:
         if (in_filter->has_ignored_msg()) {
             VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
                        << ") because: " << in_filter->ignored_msg();
-            _is_ignored_in_filter = true;
-            _ignored_in_filter_msg = in_filter->ignored_msg();
+            _ignored = true;
+            _ignored_msg = in_filter->ignored_msg();
             return Status::OK();
         }
 
@@ -893,9 +886,9 @@ public:
 
     bool is_bloomfilter() const { return _is_bloomfilter; }
 
-    bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
+    bool is_ignored() const { return _ignored; }
 
-    const std::string& get_ignored_in_filter_msg() const { return 
_ignored_in_filter_msg; }
+    const std::string& ignored_msg() const { return _ignored_msg; }
 
     void batch_assign(const PInFilter* filter,
                       void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
@@ -938,8 +931,8 @@ private:
 
     vectorized::SharedRuntimeFilterContext _context;
     bool _is_bloomfilter = false;
-    bool _is_ignored_in_filter = false;
-    std::string _ignored_in_filter_msg;
+    bool _ignored = false;
+    std::string _ignored_msg;
     uint32_t _filter_id;
 };
 
@@ -1022,7 +1015,7 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
                                           std::vector<vectorized::VExprSPtr>& 
push_exprs,
                                           bool is_late_arrival) {
     DCHECK(is_consumer());
-    if (_is_ignored) {
+    if (_wrapper->is_ignored()) {
         return Status::OK();
     }
     if (!is_late_arrival) {
@@ -1153,11 +1146,16 @@ void 
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
 }
 
 void IRuntimeFilter::set_ignored(const std::string& msg) {
-    _is_ignored = true;
-    if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
-        _wrapper->_is_ignored_in_filter = true;
-        _wrapper->_ignored_in_filter_msg = msg;
-    }
+    _wrapper->_ignored = true;
+    _wrapper->_ignored_msg = msg;
+}
+
+std::string IRuntimeFilter::_format_status() const {
+    return fmt::format(
+            "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, 
HasRemoteTarget = {}, "
+            "HasLocalTarget = {}]",
+            _is_push_down, _get_explain_state_string(), 
_wrapper->is_ignored(), _has_remote_target,
+            _has_local_target);
 }
 
 BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
@@ -1223,7 +1221,6 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
     }
 
     _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
-
     return _wrapper->init(&params);
 }
 
@@ -1348,14 +1345,11 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 }
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
-    if (!_is_ignored && wrapper->is_ignored_in_filter()) {
-        set_ignored(wrapper->get_ignored_in_filter_msg());
+    if (!_wrapper->is_ignored() && wrapper->is_ignored()) {
+        set_ignored(wrapper->ignored_msg());
     }
     auto origin_type = _wrapper->get_real_type();
     Status status = _wrapper->merge(wrapper);
-    if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
-        set_ignored(_wrapper->get_ignored_in_filter_msg());
-    }
     if (origin_type != _wrapper->get_real_type()) {
         update_runtime_filter_type_to_profile();
     }
@@ -1406,8 +1400,8 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
     auto column_type = _wrapper->column_type();
     filter->set_column_type(to_proto(column_type));
 
-    if (_is_ignored) {
-        filter->set_ignored_msg(_ignored_msg);
+    if (_wrapper->is_ignored()) {
+        filter->set_ignored_msg(_wrapper->ignored_msg());
         return;
     }
 
@@ -1704,22 +1698,20 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
     auto real_filter_type = get_real_type();
     switch (real_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
-        if (!_is_ignored_in_filter) {
-            TTypeDesc type_desc = 
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
-            type_desc.__set_is_nullable(false);
-            TExprNode node;
-            node.__set_type(type_desc);
-            node.__set_node_type(TExprNodeType::IN_PRED);
-            node.in_predicate.__set_is_not_in(false);
-            node.__set_opcode(TExprOpcode::FILTER_IN);
-            node.__set_is_nullable(false);
-
-            auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
-            in_pred->set_filter(_context.hybrid_set);
-            in_pred->add_child(probe_ctx->root());
-            auto wrapper = 
vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred);
-            container.push_back(wrapper);
-        }
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        type_desc.__set_is_nullable(false);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::IN_PRED);
+        node.in_predicate.__set_is_not_in(false);
+        node.__set_opcode(TExprOpcode::FILTER_IN);
+        node.__set_is_nullable(false);
+
+        auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
+        in_pred->set_filter(_context.hybrid_set);
+        in_pred->add_child(probe_ctx->root());
+        auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
in_pred);
+        container.push_back(wrapper);
         break;
     }
     case RuntimeFilterType::MIN_FILTER: {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 9a7c1a2ae3c..f68c0ec250c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -199,8 +199,6 @@ public:
               _rf_state_atomic(RuntimeFilterState::NOT_READY),
               _role(RuntimeFilterRole::PRODUCER),
               _expr_order(-1),
-              _always_true(false),
-              _is_ignored(false),
               registration_time_(MonotonicMillis()),
               _wait_infinitely(_state->runtime_filter_wait_infinitely),
               _rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
@@ -381,15 +379,9 @@ protected:
 
     void _set_push_down() { _is_push_down = true; }
 
-    std::string _format_status() {
-        return fmt::format(
-                "[IsPushDown = {}, RuntimeFilterState = {}, IsIgnored = {}, 
HasRemoteTarget = {}, "
-                "HasLocalTarget = {}]",
-                _is_push_down, _get_explain_state_string(), _is_ignored, 
_has_remote_target,
-                _has_local_target);
-    }
+    std::string _format_status() const;
 
-    std::string _get_explain_state_string() {
+    std::string _get_explain_state_string() const {
         if (_enable_pipeline_exec) {
             return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY
                            ? "READY"
@@ -430,16 +422,8 @@ protected:
 
     bool _is_push_down = false;
 
-    // if set always_true = true
-    // this filter won't filter any data
-    bool _always_true;
-
     TExpr _probe_expr;
 
-    // Indicate whether runtime filter expr has been ignored
-    bool _is_ignored;
-    std::string _ignored_msg;
-
     struct RPCContext;
 
     std::shared_ptr<RPCContext> _rpc_context;
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 8f5dab22f8c..e1a1f871b95 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -48,11 +48,14 @@ public:
         std::map<int, bool> has_in_filter;
 
         auto ignore_local_filter = [&](int filter_id) {
+            // Now pipeline x have bug in ignore, after fix the problem enable 
ignore logic in pipeline x
             if (_is_global) {
                 return Status::OK();
             }
+            auto runtime_filter_mgr = state->runtime_filter_mgr();
+
             std::vector<IRuntimeFilter*> filters;
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, 
filters));
+            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
             if (filters.empty()) {
                 throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
                                 filter_id);
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 46a0dacb78e..916ce62aa26 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -102,14 +102,10 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         IRuntimeFilter* runtime_filter = nullptr;
         if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
             
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), p.node_id(), false));
-            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, p.node_id(), &runtime_filter));
+                    filter_desc, state->query_options(), p.node_id(), 
&runtime_filter, false));
         } else {
             
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), p.node_id(), false));
-            RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, p.node_id(), &runtime_filter));
+                    filter_desc, state->query_options(), p.node_id(), 
&runtime_filter, false));
         }
         runtime_filter->init_profile(_runtime_profile.get());
     }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3a9a7c2a88d..2eff857a3d3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -635,11 +635,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
         _set_scan_concurrency(params, query_ctx.get());
-
-        bool is_pipeline = false;
-        if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
-            is_pipeline = true;
-        }
+        const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, 
Params>;
 
         if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
             uint64_t tg_id = params.workload_groups[0].id;
@@ -1333,7 +1329,7 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
     int64_t start_apply = MonotonicMillis();
 
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (fragment_instance_ids.size() > 0) {
+    if (!fragment_instance_ids.empty()) {
         UniqueId fragment_instance_id = fragment_instance_ids[0];
         TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 3a01368b583..8281ae2ea6d 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -69,23 +69,6 @@ Status RuntimeFilterMgr::get_producer_filter(const int 
filter_id, IRuntimeFilter
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int 
node_id,
-                                            IRuntimeFilter** consumer_filter) {
-    std::lock_guard<std::mutex> l(_lock);
-    auto iter = _consumer_map.find(filter_id);
-    if (iter != _consumer_map.cend()) {
-        for (auto& item : iter->second) {
-            if (item.node_id == node_id) {
-                *consumer_filter = item.filter;
-                return Status::OK();
-            }
-        }
-    }
-
-    return Status::InvalidArgument("unknown filter, filter_id: {}, node_id: 
{}, role: CONSUMER",
-                                   filter_id, node_id);
-}
-
 Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
                                              std::vector<IRuntimeFilter*>& 
consumer_filters) {
     std::lock_guard<std::mutex> l(_lock);
@@ -101,16 +84,17 @@ Status RuntimeFilterMgr::get_consume_filters(const int 
filter_id,
 
 Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& 
options, int node_id,
+                                                  IRuntimeFilter** 
consumer_filter,
                                                   bool build_bf_exactly, bool 
is_global) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
+    bool has_exist = false;
 
     std::lock_guard<std::mutex> l(_lock);
-    auto iter = _consumer_map.find(key);
-    bool has_exist = false;
-    if (iter != _consumer_map.end()) {
+    if (auto iter = _consumer_map.find(key); iter != _consumer_map.end()) {
         for (auto holder : iter->second) {
             if (holder.node_id == node_id) {
+                *consumer_filter = holder.filter;
                 has_exist = true;
             }
         }
@@ -128,6 +112,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
                 _state, remote_opt_or_global ? _state->obj_pool() : &_pool, 
&desc, &options,
                 RuntimeFilterRole::CONSUMER, node_id, &filter, 
build_bf_exactly, is_global));
         _consumer_map[key].emplace_back(node_id, filter);
+        *consumer_filter = filter;
     } else if (!remote_opt_or_global) {
         return Status::InvalidArgument("filter has registered");
     }
@@ -201,7 +186,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
             new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, 
runtime_filter_desc));
 
     auto filter_id = runtime_filter_desc->filter_id;
-    // LOG(INFO) << "entity filter id:" << filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options,
                                                     -1, false));
     _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
@@ -224,7 +208,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
             new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, 
runtime_filter_desc));
 
     auto filter_id = runtime_filter_desc->filter_id;
-    // LOG(INFO) << "entity filter id:" << filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options));
     _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
     return Status::OK();
@@ -281,7 +264,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                                                  
butil::IOBufAsZeroCopyInputStream* attach_data,
                                                  bool opt_remote_rf) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-    std::shared_ptr<RuntimeFilterCntlVal> cntVal;
+    std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
     int merged_size = 0;
     int64_t merge_time = 0;
     int64_t start_merge = MonotonicMillis();
@@ -296,39 +279,39 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                                            
std::to_string(request->filter_id()));
         }
     }
-    cntVal = iter->second.first;
+    cnt_val = iter->second.first;
     {
         std::lock_guard<std::mutex> l(*iter->second.second);
         // Skip the other broadcast join runtime filter
-        if (cntVal->arrive_id.size() == 1 && 
cntVal->runtime_filter_desc.is_broadcast_join) {
+        if (cnt_val->arrive_id.size() == 1 && 
cnt_val->runtime_filter_desc.is_broadcast_join) {
             return Status::OK();
         }
         MergeRuntimeFilterParams params(request, attach_data);
-        ObjectPool* pool = cntVal->pool.get();
+        ObjectPool* pool = cnt_val->pool.get();
         RuntimeFilterWrapperHolder holder;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, &params, pool, 
holder.getHandle()));
-        RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
-        cntVal->arrive_id.insert(UniqueId(request->fragment_instance_id()));
-        merged_size = cntVal->arrive_id.size();
+        
RETURN_IF_ERROR(cnt_val->filter->merge_from(holder.getHandle()->get()));
+        cnt_val->arrive_id.insert(UniqueId(request->fragment_instance_id()));
+        merged_size = cnt_val->arrive_id.size();
         // TODO: avoid log when we had acquired a lock
-        VLOG_ROW << "merge size:" << merged_size << ":" << 
cntVal->producer_size;
-        DCHECK_LE(merged_size, cntVal->producer_size);
-        cntVal->merge_time += (MonotonicMillis() - start_merge);
-        merge_time = cntVal->merge_time;
+        VLOG_ROW << "merge size:" << merged_size << ":" << 
cnt_val->producer_size;
+        DCHECK_LE(merged_size, cnt_val->producer_size);
+        cnt_val->merge_time += (MonotonicMillis() - start_merge);
+        merge_time = cnt_val->merge_time;
     }
 
-    if (merged_size == cntVal->producer_size) {
+    if (merged_size == cnt_val->producer_size) {
         if (opt_remote_rf) {
-            DCHECK_GT(cntVal->targetv2_info.size(), 0);
-            DCHECK(cntVal->filter->is_bloomfilter());
+            DCHECK_GT(cnt_val->targetv2_info.size(), 0);
+            DCHECK(cnt_val->filter->is_bloomfilter());
             // Optimize merging phase iff:
             // 1. All BE has been upgraded (e.g. _opt_remote_rf)
-            // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
+            // 2. FE has been upgraded (e.g. cnt_val->targetv2_info.size() > 0)
             // 3. This filter is bloom filter (only bloom filter should be 
used for merging)
             using PPublishFilterRpcContext =
                     AsyncRPCContext<PPublishFilterRequestV2, 
PPublishFilterResponse>;
             std::vector<std::unique_ptr<PPublishFilterRpcContext>> 
rpc_contexts;
-            rpc_contexts.reserve(cntVal->targetv2_info.size());
+            rpc_contexts.reserve(cnt_val->targetv2_info.size());
 
             butil::IOBuf request_attachment;
 
@@ -337,13 +320,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, 
&len));
+            RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, 
&len));
             if (data != nullptr && len > 0) {
                 request_attachment.append(data, len);
                 has_attachment = true;
             }
 
-            std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cntVal->targetv2_info;
+            std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cnt_val->targetv2_info;
             for (size_t i = 0; i < targets.size(); i++) {
                 rpc_contexts.emplace_back(new PPublishFilterRpcContext);
                 size_t cur = rpc_contexts.size() - 1;
@@ -393,7 +376,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             using PPublishFilterRpcContext =
                     AsyncRPCContext<PPublishFilterRequest, 
PPublishFilterResponse>;
             std::vector<std::unique_ptr<PPublishFilterRpcContext>> 
rpc_contexts;
-            rpc_contexts.reserve(cntVal->target_info.size());
+            rpc_contexts.reserve(cnt_val->target_info.size());
 
             butil::IOBuf request_attachment;
 
@@ -402,13 +385,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             void* data = nullptr;
             int len = 0;
             bool has_attachment = false;
-            RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, 
&len));
+            RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, 
&len));
             if (data != nullptr && len > 0) {
                 request_attachment.append(data, len);
                 has_attachment = true;
             }
 
-            std::vector<TRuntimeFilterTargetParams>& targets = 
cntVal->target_info;
+            std::vector<TRuntimeFilterTargetParams>& targets = 
cnt_val->target_info;
             for (size_t i = 0; i < targets.size(); i++) {
                 rpc_contexts.emplace_back(new PPublishFilterRpcContext);
                 size_t cur = rpc_contexts.size() - 1;
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 24ab78464db..de55e34fc1e 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -71,17 +71,14 @@ public:
 
     ~RuntimeFilterMgr() = default;
 
-    Status get_consume_filter(const int filter_id, const int node_id,
-                              IRuntimeFilter** consumer_filter);
-
     Status get_consume_filters(const int filter_id, 
std::vector<IRuntimeFilter*>& consumer_filters);
 
     Status get_producer_filter(const int filter_id, IRuntimeFilter** 
producer_filter);
 
     // register filter
     Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    int node_id, bool build_bf_exactly = false,
-                                    bool is_global = false);
+                                    int node_id, IRuntimeFilter** 
consumer_filter,
+                                    bool build_bf_exactly = false, bool 
is_global = false);
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     bool build_bf_exactly = false, bool 
is_global = false,
                                     int parallel_tasks = 0);
@@ -149,9 +146,6 @@ public:
         std::shared_ptr<ObjectPool> pool;
     };
 
-public:
-    RuntimeFilterCntlVal* get_filter(int id) { return 
_filter_map[id].first.get(); }
-
 private:
     Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
                            const TQueryOptions* query_options,
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index b7146c7c6a7..5e2d90bf62d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -58,20 +58,17 @@ Status RuntimeFilterConsumer::_register_runtime_filter(bool 
is_global) {
             // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
             // 2. This filter is bloom filter (only bloom filter should be 
used for merging)
             
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, false, 
is_global));
-            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, _filter_id, &runtime_filter));
+                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
+                    is_global));
         } else if (is_global) {
             // For pipelineX engine, runtime filter is global iff data 
distribution is ignored.
             
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, false, 
is_global));
-            
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, _filter_id, &runtime_filter));
+                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
+                    is_global));
         } else {
             
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, _state->query_options(), _filter_id, false, 
is_global));
-            RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, _filter_id, &runtime_filter));
+                    filter_desc, _state->query_options(), _filter_id, 
&runtime_filter, false,
+                    is_global));
         }
         _runtime_filter_ctxs.emplace_back(runtime_filter);
         _runtime_filter_ready_flag.emplace_back(false);
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp 
b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 6c5db2c0161..42f6250a030 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -83,14 +83,10 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* 
state) {
         IRuntimeFilter* runtime_filter = nullptr;
         if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
             
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), id(), false));
-            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, id(), &runtime_filter));
+                    filter_desc, state->query_options(), id(), 
&runtime_filter, false));
         } else {
             
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
-                    filter_desc, state->query_options(), id(), false));
-            
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-                                                                            
id(), &runtime_filter));
+                    filter_desc, state->query_options(), id(), 
&runtime_filter, false));
         }
         runtime_filter->init_profile(_runtime_profile.get());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to