This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f41562e6a63 [Fix](rf) fix multi thread init error in 
RuntimeFilterMergeControllerEntity (#31337)
f41562e6a63 is described below

commit f41562e6a63814eccf6dbd2587afd9ec90b1aae3
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Feb 23 20:34:13 2024 +0800

    [Fix](rf) fix multi thread init error in RuntimeFilterMergeControllerEntity 
(#31337)
---
 be/src/runtime/fragment_mgr.cpp       | 12 +++++++++---
 be/src/runtime/runtime_filter_mgr.cpp | 12 +++++-------
 be/src/runtime/runtime_filter_mgr.h   | 18 +++++++-----------
 3 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f6176516721..b9a1a6ef594 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -719,9 +719,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     static_cast<void>(_runtimefilter_controller.add_entity(
             params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
-    query_ctx->set_merge_controller_handler(handler);
     {
         std::lock_guard<std::mutex> lock(_lock);
+        if (handler) {
+            query_ctx->set_merge_controller_handler(handler);
+        }
         _fragment_instance_map.insert(
                 std::make_pair(params.params.fragment_instance_id, 
fragment_executor));
         _cv.notify_all();
@@ -807,7 +809,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     params.local_params[i], params.query_id, 
params.query_options, &handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            query_ctx->set_merge_controller_handler(handler);
+            if (i == 0 and handler) {
+                query_ctx->set_merge_controller_handler(handler);
+            }
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
                 std::lock_guard<std::mutex> lock(_lock);
@@ -887,7 +891,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             static_cast<void>(_runtimefilter_controller.add_entity(
                     local_params, params.query_id, params.query_options, 
&handler,
                     
RuntimeFilterParamsContext::create(context->get_runtime_state())));
-            query_ctx->set_merge_controller_handler(handler);
+            if (i == 0 and handler) {
+                query_ctx->set_merge_controller_handler(handler);
+            }
             {
                 std::lock_guard<std::mutex> lock(_lock);
                 _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 81d5dc88d54..a3b7ad3ddad 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -486,24 +486,22 @@ Status RuntimeFilterMergeController::acquire(
         UniqueId query_id, 
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
     uint32_t shard = _get_controller_shard_idx(query_id);
     std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map[shard].find(query_id_str);
+    auto iter = _filter_controller_map[shard].find(query_id);
     if (iter == _filter_controller_map[shard].end()) {
-        LOG(WARNING) << "not found entity, query-id:" << query_id_str;
+        LOG(WARNING) << "not found entity, query-id:" << query_id.to_string();
         return Status::InvalidArgument("not found entity");
     }
-    *handle = _filter_controller_map[shard][query_id_str].lock();
+    *handle = _filter_controller_map[shard][query_id].lock();
     if (*handle == nullptr) {
         return Status::InvalidArgument("entity is closed");
     }
     return Status::OK();
 }
 
-Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
+void RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
     uint32_t shard = _get_controller_shard_idx(query_id);
     std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-    _filter_controller_map[shard].erase(query_id.to_string());
-    return Status::OK();
+    _filter_controller_map[shard].erase(query_id);
 }
 
 RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* 
state) {
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 19908166942..d29978d3b60 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -133,7 +133,7 @@ private:
 class RuntimeFilterMergeControllerEntity {
 public:
     RuntimeFilterMergeControllerEntity(RuntimeFilterParamsContext* state)
-            : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {}
+            : _query_id(0, 0), _state(state) {}
     ~RuntimeFilterMergeControllerEntity() = default;
 
     Status init(UniqueId query_id, const TRuntimeFilterParams& 
runtime_filter_params,
@@ -145,8 +145,6 @@ public:
 
     UniqueId query_id() const { return _query_id; }
 
-    UniqueId instance_id() const { return _fragment_instance_id; }
-
     struct RuntimeFilterCntlVal {
         int64_t merge_time;
         int producer_size;
@@ -170,7 +168,6 @@ private:
                            const int producer_size);
 
     UniqueId _query_id;
-    UniqueId _fragment_instance_id;
     // protect _filter_map
     std::shared_mutex _filter_map_mutex;
     std::shared_ptr<MemTracker> _mem_tracker;
@@ -199,22 +196,21 @@ public:
         }
 
         // TODO: why we need string, direct use UniqueId
-        std::string query_id_str = query_id.to_string();
         uint32_t shard = _get_controller_shard_idx(query_id);
         std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
-        auto iter = _filter_controller_map[shard].find(query_id_str);
+        auto iter = _filter_controller_map[shard].find(query_id);
         if (iter == _filter_controller_map[shard].end()) {
             *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
                     new RuntimeFilterMergeControllerEntity(state),
                     [this](RuntimeFilterMergeControllerEntity* entity) {
-                        static_cast<void>(remove_entity(entity->query_id()));
+                        remove_entity(entity->query_id());
                         delete entity;
                     });
-            _filter_controller_map[shard][query_id_str] = *handle;
+            _filter_controller_map[shard][query_id] = *handle;
             const TRuntimeFilterParams& filter_params = 
params.runtime_filter_params;
             RETURN_IF_ERROR(handle->get()->init(query_id, filter_params, 
query_options));
         } else {
-            *handle = _filter_controller_map[shard][query_id_str].lock();
+            *handle = _filter_controller_map[shard][query_id].lock();
         }
         return Status::OK();
     }
@@ -228,7 +224,7 @@ public:
     // thread safe
     // remove a entity by query-id
     // remove_entity will be called automatically by entity when entity is 
destroyed
-    Status remove_entity(UniqueId query_id);
+    void remove_entity(UniqueId query_id);
 
     static const int kShardNum = 128;
 
@@ -241,7 +237,7 @@ private:
     // We store the weak pointer here.
     // When the external object is destroyed, we need to clear this record
     using FilterControllerMap =
-            std::unordered_map<std::string, 
std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+            std::unordered_map<UniqueId, 
std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
     // str(query-id) -> entity
     FilterControllerMap _filter_controller_map[kShardNum];
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to