This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 144204fecce872f260c1aedfa514ee1f88c299e2
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Jan 26 10:16:17 2024 +0800

    [Refactor](Rf) refactor the code of runtime filter (#30268)
---
 be/src/pipeline/pipeline_fragment_context.cpp      |   1 -
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   1 -
 be/src/runtime/fragment_mgr.cpp                    |   6 +-
 be/src/runtime/plan_fragment_executor.cpp          |   1 -
 be/src/runtime/runtime_filter_mgr.cpp              | 182 +++++----------------
 be/src/runtime/runtime_filter_mgr.h                |  46 ++++--
 6 files changed, 75 insertions(+), 162 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 2341071d963..5e8af4940b8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -243,7 +243,6 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     // TODO should be combine with plan_fragment_executor.prepare funciton
     SCOPED_ATTACH_TASK(_runtime_state.get());
-    static_cast<void>(_runtime_state->runtime_filter_mgr()->init());
     _runtime_state->set_be_number(local_params.backend_num);
 
     if (request.__isset.backend_id) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index de0a544f18c..b639603d19e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -533,7 +533,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         if (local_params.__isset.runtime_filter_params) {
             
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);
         }
-        RETURN_IF_ERROR(runtime_filter_mgr->init());
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
         _runtime_filter_states.push_back(std::move(filterparams));
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 403c2463c2a..c38365d6a0c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -778,7 +778,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
     // TODO need check the status, but when I add return_if_error the P0 will 
not pass
     static_cast<void>(_runtimefilter_controller.add_entity(
-            params, &handler,
+            params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
     fragment_executor->set_merge_controller_handler(handler);
     {
@@ -866,7 +866,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         for (size_t i = 0; i < params.local_params.size(); i++) {
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             static_cast<void>(_runtimefilter_controller.add_entity(
-                    params, params.local_params[i], &handler,
+                    params.local_params[i], params.query_id, 
params.query_options, &handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
             context->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
@@ -946,7 +946,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             static_cast<void>(_runtimefilter_controller.add_entity(
-                    params, local_params, &handler,
+                    local_params, params.query_id, params.query_options, 
&handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
             context->set_merge_controller_handler(handler);
 
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 2e8735c013d..0e4ecd7f5a7 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -133,7 +133,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
-    static_cast<void>(_runtime_state->runtime_filter_mgr()->init());
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
         _runtime_state->set_backend_id(request.backend_id);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 73a043070e3..29bf22535ed 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -54,21 +54,15 @@ struct AsyncRPCContext {
 RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, 
RuntimeFilterParamsContext* state) {
     _state = state;
     _state->runtime_filter_mgr = this;
-}
-
-Status RuntimeFilterMgr::init() {
     _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
                                             
ExecEnv::GetInstance()->experimental_mem_tracker());
-    return Status::OK();
 }
 
 Status RuntimeFilterMgr::get_producer_filter(const int filter_id, 
IRuntimeFilter** target) {
-    int32_t key = filter_id;
-
     std::lock_guard<std::mutex> l(_lock);
-    auto iter = _producer_map.find(key);
+    auto iter = _producer_map.find(filter_id);
     if (iter == _producer_map.end()) {
-        return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", 
key);
+        return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", 
filter_id);
     }
 
     *target = iter->second;
@@ -94,11 +88,10 @@ Status RuntimeFilterMgr::get_consume_filter(const int 
filter_id, const int node_
 
 Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
                                              std::vector<IRuntimeFilter*>& 
consumer_filters) {
-    int32_t key = filter_id;
     std::lock_guard<std::mutex> l(_lock);
-    auto iter = _consumer_map.find(key);
+    auto iter = _consumer_map.find(filter_id);
     if (iter == _consumer_map.end()) {
-        return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", 
key);
+        return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", 
filter_id);
     }
     for (auto& holder : iter->second) {
         consumer_filters.emplace_back(holder.filter);
@@ -114,52 +107,31 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
 
     std::lock_guard<std::mutex> l(_lock);
     auto iter = _consumer_map.find(key);
-    if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && 
desc.has_remote_targets &&
-        desc.type == TRuntimeFilterType::BLOOM) {
-        // if this runtime filter has remote target (e.g. need merge), we 
reuse the runtime filter between all instances
-
-        iter = _consumer_map.find(key);
-        if (iter != _consumer_map.end()) {
-            for (auto holder : iter->second) {
-                if (holder.node_id == node_id) {
-                    return Status::OK();
-                }
-            }
-        }
-        IRuntimeFilter* filter;
-        RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), 
&desc, &options,
-                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
-                                               build_bf_exactly));
-        _consumer_map[key].emplace_back(node_id, filter);
-    } else if (is_global) {
-        if (iter != _consumer_map.end()) {
-            for (auto holder : iter->second) {
-                if (holder.node_id == node_id) {
-                    return Status::OK();
-                }
+    bool has_exist = false;
+    if (iter != _consumer_map.end()) {
+        for (auto holder : iter->second) {
+            if (holder.node_id == node_id) {
+                has_exist = true;
             }
         }
+    }
 
-        IRuntimeFilter* filter;
-        RETURN_IF_ERROR(IRuntimeFilter::create(_state, _state->obj_pool(), 
&desc, &options,
-                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
-                                               build_bf_exactly, is_global));
-        _consumer_map[key].emplace_back(node_id, filter);
-    } else {
-        if (iter != _consumer_map.end()) {
-            for (auto holder : iter->second) {
-                if (holder.node_id == node_id) {
-                    return Status::InvalidArgument("filter has registered");
-                }
-            }
-        }
+    // TODO: make the two case as one case to judge
+    bool remote_opt_or_global =
+            (desc.__isset.opt_remote_rf && desc.opt_remote_rf && 
desc.has_remote_targets &&
+             desc.type == TRuntimeFilterType::BLOOM) ||
+            is_global;
 
+    if (!has_exist) {
         IRuntimeFilter* filter;
-        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
-                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
-                                               build_bf_exactly));
+        RETURN_IF_ERROR(IRuntimeFilter::create(
+                _state, remote_opt_or_global ? _state->obj_pool() : &_pool, 
&desc, &options,
+                RuntimeFilterRole::CONSUMER, node_id, &filter, 
build_bf_exactly, is_global));
         _consumer_map[key].emplace_back(node_id, filter);
+    } else if (!remote_opt_or_global) {
+        return Status::InvalidArgument("filter has registered");
     }
+
     return Status::OK();
 }
 
@@ -218,21 +190,21 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
         const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
         const int producer_size) {
     std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
-    std::shared_ptr<RuntimeFilterCntlVal> cntVal = 
std::make_shared<RuntimeFilterCntlVal>();
+    std::shared_ptr<RuntimeFilterCntlVal> cnt_val = 
std::make_shared<RuntimeFilterCntlVal>();
     // runtime_filter_desc and target will be released,
-    // so we need to copy to cntVal
-    cntVal->producer_size = producer_size;
-    cntVal->runtime_filter_desc = *runtime_filter_desc;
-    cntVal->target_info = *target_info;
-    cntVal->pool.reset(new ObjectPool());
-    cntVal->filter = cntVal->pool->add(
+    // so we need to copy to cnt_val
+    cnt_val->producer_size = producer_size;
+    cnt_val->runtime_filter_desc = *runtime_filter_desc;
+    cnt_val->target_info = *target_info;
+    cnt_val->pool.reset(new ObjectPool());
+    cnt_val->filter = cnt_val->pool->add(
             new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, 
runtime_filter_desc));
 
     auto filter_id = runtime_filter_desc->filter_id;
     // LOG(INFO) << "entity filter id:" << filter_id;
-    static_cast<void>(
-            cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options, -1, false));
-    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, 
std::make_unique<std::mutex>()});
+    
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options,
+                                                    -1, false));
+    _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
     return Status::OK();
 }
 
@@ -241,20 +213,20 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
         const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
         const int producer_size) {
     std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
-    std::shared_ptr<RuntimeFilterCntlVal> cntVal = 
std::make_shared<RuntimeFilterCntlVal>();
+    std::shared_ptr<RuntimeFilterCntlVal> cnt_val = 
std::make_shared<RuntimeFilterCntlVal>();
     // runtime_filter_desc and target will be released,
-    // so we need to copy to cntVal
-    cntVal->producer_size = producer_size;
-    cntVal->runtime_filter_desc = *runtime_filter_desc;
-    cntVal->targetv2_info = *targetv2_info;
-    cntVal->pool.reset(new ObjectPool());
-    cntVal->filter = cntVal->pool->add(
+    // so we need to copy to cnt_val
+    cnt_val->producer_size = producer_size;
+    cnt_val->runtime_filter_desc = *runtime_filter_desc;
+    cnt_val->targetv2_info = *targetv2_info;
+    cnt_val->pool.reset(new ObjectPool());
+    cnt_val->filter = cnt_val->pool->add(
             new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, 
runtime_filter_desc));
 
     auto filter_id = runtime_filter_desc->filter_id;
     // LOG(INFO) << "entity filter id:" << filter_id;
-    
static_cast<void>(cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options));
-    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, 
std::make_unique<std::mutex>()});
+    
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options));
+    _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, 
std::make_unique<std::mutex>()});
     return Status::OK();
 }
 
@@ -308,7 +280,6 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* 
request,
                                                  
butil::IOBufAsZeroCopyInputStream* attach_data,
                                                  bool opt_remote_rf) {
-    _opt_remote_rf = _opt_remote_rf && opt_remote_rf;
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
@@ -485,70 +456,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     return Status::OK();
 }
 
-Status RuntimeFilterMergeController::add_entity(
-        const TExecPlanFragmentParams& params,
-        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
-        RuntimeFilterParamsContext* state) {
-    if (!params.params.__isset.runtime_filter_params ||
-        params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) 
{
-        return Status::OK();
-    }
-
-    runtime_filter_merge_entity_closer entity_closer =
-            std::bind(runtime_filter_merge_entity_close, this, 
std::placeholders::_1);
-
-    UniqueId query_id(params.params.query_id);
-    std::string query_id_str = query_id.to_string();
-    UniqueId fragment_instance_id = 
UniqueId(params.params.fragment_instance_id);
-    uint32_t shard = _get_controller_shard_idx(query_id);
-    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    auto iter = _filter_controller_map[shard].find(query_id_str);
-
-    if (iter == _filter_controller_map[shard].end()) {
-        *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
-                new RuntimeFilterMergeControllerEntity(state), entity_closer);
-        _filter_controller_map[shard][query_id_str] = *handle;
-        const TRuntimeFilterParams& filter_params = 
params.params.runtime_filter_params;
-        RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, 
filter_params,
-                                            params.query_options));
-    } else {
-        *handle = _filter_controller_map[shard][query_id_str].lock();
-    }
-    return Status::OK();
-}
-
-Status RuntimeFilterMergeController::add_entity(
-        const TPipelineFragmentParams& params, const TPipelineInstanceParams& 
local_params,
-        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle,
-        RuntimeFilterParamsContext* state) {
-    if (!local_params.__isset.runtime_filter_params ||
-        local_params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
-        return Status::OK();
-    }
-
-    runtime_filter_merge_entity_closer entity_closer =
-            std::bind(runtime_filter_merge_entity_close, this, 
std::placeholders::_1);
-
-    UniqueId query_id(params.query_id);
-    std::string query_id_str = query_id.to_string();
-    UniqueId fragment_instance_id = 
UniqueId(local_params.fragment_instance_id);
-    uint32_t shard = _get_controller_shard_idx(query_id);
-    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    auto iter = _filter_controller_map[shard].find(query_id_str);
-
-    if (iter == _filter_controller_map[shard].end()) {
-        *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
-                new RuntimeFilterMergeControllerEntity(state), entity_closer);
-        _filter_controller_map[shard][query_id_str] = *handle;
-        const TRuntimeFilterParams& filter_params = 
local_params.runtime_filter_params;
-        RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, 
filter_params,
-                                            params.query_options));
-    } else {
-        *handle = _filter_controller_map[shard][query_id_str].lock();
-    }
-    return Status::OK();
-}
-
 Status RuntimeFilterMergeController::acquire(
         UniqueId query_id, 
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
     uint32_t shard = _get_controller_shard_idx(query_id);
@@ -573,13 +480,6 @@ Status 
RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
     return Status::OK();
 }
 
-// auto called while call ~std::shared_ptr<RuntimeFilterMergeControllerEntity>
-void runtime_filter_merge_entity_close(RuntimeFilterMergeController* 
controller,
-                                       RuntimeFilterMergeControllerEntity* 
entity) {
-    static_cast<void>(controller->remove_entity(entity->query_id()));
-    delete entity;
-}
-
 RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* 
state) {
     RuntimeFilterParamsContext* params = state->obj_pool()->add(new 
RuntimeFilterParamsContext());
     params->runtime_filter_wait_infinitely = 
state->runtime_filter_wait_infinitely();
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 86e470a706e..24ab78464db 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -71,8 +71,6 @@ public:
 
     ~RuntimeFilterMgr() = default;
 
-    Status init();
-
     Status get_consume_filter(const int filter_id, const int node_id,
                               IRuntimeFilter** consumer_filter);
 
@@ -114,7 +112,7 @@ private:
 
     TNetworkAddress _merge_addr;
 
-    bool _has_merge_addr;
+    bool _has_merge_addr = false;
     std::mutex _lock;
 };
 
@@ -174,7 +172,6 @@ private:
             std::pair<std::shared_ptr<RuntimeFilterCntlVal>, 
std::unique_ptr<std::mutex>>;
     std::map<int, CntlValwithLock> _filter_map;
     RuntimeFilterParamsContext* _state = nullptr;
-    bool _opt_remote_rf = true;
 };
 
 // RuntimeFilterMergeController has a map query-id -> entity
@@ -187,13 +184,37 @@ public:
     // add a query-id -> entity
     // If a query-id -> entity already exists
     // add_entity will return a exists entity
-    Status add_entity(const TExecPlanFragmentParams& params,
-                      std::shared_ptr<RuntimeFilterMergeControllerEntity>* 
handle,
-                      RuntimeFilterParamsContext* state);
-    Status add_entity(const TPipelineFragmentParams& params,
-                      const TPipelineInstanceParams& local_params,
+    Status add_entity(const auto& params, UniqueId query_id, const 
TQueryOptions& query_options,
                       std::shared_ptr<RuntimeFilterMergeControllerEntity>* 
handle,
-                      RuntimeFilterParamsContext* state);
+                      RuntimeFilterParamsContext* state) {
+        if (!params.__isset.runtime_filter_params ||
+            params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
+            return Status::OK();
+        }
+
+        // TODO: why we need string, direct use UniqueId
+        std::string query_id_str = query_id.to_string();
+        UniqueId fragment_instance_id = UniqueId(params.fragment_instance_id);
+        uint32_t shard = _get_controller_shard_idx(query_id);
+        std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+        auto iter = _filter_controller_map[shard].find(query_id_str);
+        if (iter == _filter_controller_map[shard].end()) {
+            *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
+                    new RuntimeFilterMergeControllerEntity(state),
+                    [this](RuntimeFilterMergeControllerEntity* entity) {
+                        static_cast<void>(remove_entity(entity->query_id()));
+                        delete entity;
+                    });
+            _filter_controller_map[shard][query_id_str] = *handle;
+            const TRuntimeFilterParams& filter_params = 
params.runtime_filter_params;
+            RETURN_IF_ERROR(handle->get()->init(query_id, 
fragment_instance_id, filter_params,
+                                                query_options));
+        } else {
+            *handle = _filter_controller_map[shard][query_id_str].lock();
+        }
+        return Status::OK();
+    }
+
     // thread safe
     // increase a reference count
     // if a query-id is not exist
@@ -221,11 +242,6 @@ private:
     FilterControllerMap _filter_controller_map[kShardNum];
 };
 
-using runtime_filter_merge_entity_closer = 
std::function<void(RuntimeFilterMergeControllerEntity*)>;
-
-void runtime_filter_merge_entity_close(RuntimeFilterMergeController* 
controller,
-                                       RuntimeFilterMergeControllerEntity* 
entity);
-
 //There are two types of runtime filters:
 // one is global, originating from QueryContext,
 // and the other is local, originating from RuntimeState.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to