This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 843f7b5883a [fix](runtime filter) Fix runtime filter producers (#44293)
843f7b5883a is described below

commit 843f7b5883ac652a58da3fce2cf8f1e5587c7dfa
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Nov 20 09:20:25 2024 +0800

    [fix](runtime filter) Fix runtime filter producers (#44293)
    
    A runtime filter producer may have multiple targets some of which are
    managed in global mgr and others are managed in local mgr. To process
    it, producer will be shared by both of global mgr and local mgr. In this
    PR, a producer will be always created by a local mgr and we can always
    find it by a queryCtx's RF mgr.
---
 be/src/exprs/runtime_filter.cpp       | 14 +++++++-------
 be/src/exprs/runtime_filter.h         | 15 +++++----------
 be/src/runtime/runtime_filter_mgr.cpp | 27 ++++++++++++++-------------
 be/src/runtime/runtime_filter_mgr.h   |  3 ++-
 be/src/runtime/runtime_state.cpp      |  9 ++++++---
 5 files changed, 34 insertions(+), 34 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f28cc53dcb8..b7af2561fe0 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -976,8 +976,8 @@ private:
 Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                               const TQueryOptions* query_options, const 
RuntimeFilterRole role,
                               int node_id, std::shared_ptr<IRuntimeFilter>* 
res,
-                              bool build_bf_exactly, bool need_local_merge) {
-    *res = std::make_shared<IRuntimeFilter>(state, desc, need_local_merge);
+                              bool build_bf_exactly) {
+    *res = std::make_shared<IRuntimeFilter>(state, desc);
     (*res)->set_role(role);
     return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
 }
@@ -1311,10 +1311,10 @@ bool IRuntimeFilter::get_ignored() {
 
 std::string IRuntimeFilter::formatted_state() const {
     return fmt::format(
-            "[IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
+            "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, 
HasRemoteTarget = {}, "
             "HasLocalTarget = {}, Ignored = {}]",
-            _is_push_down, _get_explain_state_string(), _has_remote_target, 
_has_local_target,
-            _wrapper->_context->ignored);
+            _filter_id, _is_push_down, _get_explain_state_string(), 
_has_remote_target,
+            _has_local_target, _wrapper->_context->ignored);
 }
 
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options,
@@ -1505,9 +1505,9 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 
 std::string IRuntimeFilter::debug_string() const {
     return fmt::format(
-            "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, 
is_broadcast: {}, "
+            "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
             "build_bf_cardinality: {}",
-            _filter_id, to_string(_runtime_filter_type), _need_local_merge, 
_is_broadcast_join,
+            _filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
             _wrapper->get_build_bf_cardinality());
 }
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 9e0e93433d5..6632c5dc872 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -190,8 +190,7 @@ enum RuntimeFilterState {
 /// that can be pushed down to node based on the results of the right table.
 class IRuntimeFilter {
 public:
-    IRuntimeFilter(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
-                   bool need_local_merge = false)
+    IRuntimeFilter(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc)
             : _state(state),
               _filter_id(desc->filter_id),
               _is_broadcast_join(true),
@@ -204,17 +203,16 @@ public:
               
_wait_infinitely(_state->get_query_ctx()->runtime_filter_wait_infinitely()),
               
_rf_wait_time_ms(_state->get_query_ctx()->runtime_filter_wait_time_ms()),
               _runtime_filter_type(get_runtime_filter_type(desc)),
-              _profile(
-                      new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, 
type = {})",
-                                                     _filter_id, 
to_string(_runtime_filter_type)))),
-              _need_local_merge(need_local_merge) {}
+              _profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = 
{}, type = {})",
+                                                      _filter_id,
+                                                      
to_string(_runtime_filter_type)))) {}
 
     ~IRuntimeFilter() = default;
 
     static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
                          const TQueryOptions* query_options, const 
RuntimeFilterRole role,
                          int node_id, std::shared_ptr<IRuntimeFilter>* res,
-                         bool build_bf_exactly = false, bool need_local_merge 
= false);
+                         bool build_bf_exactly = false);
 
     RuntimeFilterContextSPtr& get_shared_context_ref();
 
@@ -414,9 +412,6 @@ protected:
     // parent profile
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
-    // `_need_local_merge` indicates whether this runtime filter is global on 
this BE.
-    // All runtime filters should be merged on each BE before push_to_remote 
or publish.
-    bool _need_local_merge = false;
 
     std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 31b9ec3b0c2..b11e8290d96 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -109,8 +109,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
     if (!has_exist) {
         std::shared_ptr<IRuntimeFilter> filter;
         RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, 
RuntimeFilterRole::CONSUMER,
-                                               node_id, &filter, 
build_bf_exactly,
-                                               need_local_merge));
+                                               node_id, &filter, 
build_bf_exactly));
         _consumer_map[key].emplace_back(node_id, filter);
         *consumer_filter = filter;
     } else if (!need_local_merge) {
@@ -122,7 +121,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
 
 Status RuntimeFilterMgr::register_local_merge_producer_filter(
         const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& 
options,
-        std::shared_ptr<IRuntimeFilter>* producer_filter, bool 
build_bf_exactly) {
+        std::shared_ptr<IRuntimeFilter> producer_filter, bool 
build_bf_exactly) {
     DCHECK(_is_global);
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
@@ -138,21 +137,19 @@ Status 
RuntimeFilterMgr::register_local_merge_producer_filter(
     }
 
     DCHECK(_state != nullptr);
-    RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, 
RuntimeFilterRole::PRODUCER, -1,
-                                           producer_filter, build_bf_exactly, 
true));
     {
         std::lock_guard<std::mutex> l(*iter->second.lock);
         if (iter->second.filters.empty()) {
             std::shared_ptr<IRuntimeFilter> merge_filter;
             RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
                                                    
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
-                                                   build_bf_exactly, true));
+                                                   build_bf_exactly));
             merge_filter->set_ignored();
             iter->second.filters.emplace_back(merge_filter);
         }
         iter->second.merge_time++;
         iter->second.merge_size_times++;
-        iter->second.filters.emplace_back(*producer_filter);
+        iter->second.filters.emplace_back(producer_filter);
     }
     return Status::OK();
 }
@@ -173,6 +170,16 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
     return Status::OK();
 }
 
+doris::LocalMergeFilters* 
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id) {
+    DCHECK(_is_global);
+    std::lock_guard<std::mutex> l(_lock);
+    auto iter = _local_merge_producer_map.find(filter_id);
+    if (iter == _local_merge_producer_map.end()) {
+        return nullptr;
+    }
+    return &iter->second;
+}
+
 Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& 
desc,
                                                   const TQueryOptions& options,
                                                   
std::shared_ptr<IRuntimeFilter>* producer_filter,
@@ -378,12 +385,6 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
 }
 
 Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
-    auto filter = try_get_product_filter(request->filter_id());
-    if (filter) {
-        filter->set_synced_size(request->filter_size());
-        return Status::OK();
-    }
-
     LocalMergeFilters* local_merge_filters = nullptr;
     RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), 
&local_merge_filters));
     // first filter size merged filter
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 53520e43a55..dce051ab0d6 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -102,10 +102,11 @@ public:
 
     Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
                                                 const TQueryOptions& options,
-                                                
std::shared_ptr<IRuntimeFilter>* producer_filter,
+                                                
std::shared_ptr<IRuntimeFilter> producer_filter,
                                                 bool build_bf_exactly = false);
 
     Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** 
local_merge_filters);
+    LocalMergeFilters* get_local_merge_producer_filters(int filter_id);
 
     Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
                                     std::shared_ptr<IRuntimeFilter>* 
producer_filter,
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 34b3866febf..116ac95bd36 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -518,10 +518,13 @@ RuntimeFilterMgr* 
RuntimeState::global_runtime_filter_mgr() {
 Status RuntimeState::register_producer_runtime_filter(
         const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* 
producer_filter,
         bool build_bf_exactly) {
-    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
+    // Producers are created by local runtime filter mgr and shared by global 
runtime filter manager.
+    // When RF is published, consumers in both global and local RF mgr will be 
found.
+    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
             desc, query_options(), producer_filter, build_bf_exactly));
-    return local_runtime_filter_mgr()->register_producer_filter(desc, 
query_options(),
-                                                                
producer_filter, build_bf_exactly);
+    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
+            desc, query_options(), *producer_filter, build_bf_exactly));
+    return Status::OK();
 }
 
 Status RuntimeState::register_consumer_runtime_filter(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to