This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 441b450a79 (runtimefilter) shorter time prepare consumes (#13127)
441b450a79 is described below

commit 441b450a79e1a65beb18a7ad57ad62b27911ce3c
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Thu Oct 6 10:12:29 2022 +0800

    (runtimefilter) shorter time prepare consumes (#13127)
    
    Now, every preare put a runtime filter controller, so it takes the
    mutex lock on the controller map. Init of bloom filter takes some
    time in allocate and memset. If we run p1 tests with -parallel=20
    -suiteParallel=20 -actionParallel=20, then we get error message like
    'send fragment timeout 5s'.
    
    The patch fixes the problem in the following 2 ways:
    1. Replace one mutex block with 128s.
    2. If a plan fragment does not have a runtime filter, it does not need to 
take
    the locks.
---
 be/src/runtime/runtime_filter_mgr.cpp | 38 ++++++++++++++++++++---------------
 be/src/runtime/runtime_filter_mgr.h   | 15 ++++++++++----
 2 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 34c9fb33e3..73a9115a23 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -280,49 +280,55 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
 Status RuntimeFilterMergeController::add_entity(
         const TExecPlanFragmentParams& params,
         std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+    if (!params.params.__isset.runtime_filter_params ||
+        params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) 
{
+        return Status::OK();
+    }
+
     runtime_filter_merge_entity_closer entity_closer =
             std::bind(runtime_filter_merge_entity_close, this, 
std::placeholders::_1);
 
-    std::lock_guard<std::mutex> guard(_controller_mutex);
     UniqueId query_id(params.params.query_id);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
     UniqueId fragment_instance_id = 
UniqueId(params.params.fragment_instance_id);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    auto iter = _filter_controller_map[shard].find(query_id_str);
 
-    if (iter == _filter_controller_map.end()) {
+    if (iter == _filter_controller_map[shard].end()) {
         *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
                 new RuntimeFilterMergeControllerEntity(), entity_closer);
-        _filter_controller_map[query_id_str] = *handle;
+        _filter_controller_map[shard][query_id_str] = *handle;
         const TRuntimeFilterParams& filter_params = 
params.params.runtime_filter_params;
-        if (params.params.__isset.runtime_filter_params) {
-            RETURN_IF_ERROR(handle->get()->init(query_id, 
fragment_instance_id, filter_params,
-                                                params.query_options));
-        }
+        RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, 
filter_params,
+                                            params.query_options));
     } else {
-        *handle = _filter_controller_map[query_id_str].lock();
+        *handle = _filter_controller_map[shard][query_id_str].lock();
     }
     return Status::OK();
 }
 
 Status RuntimeFilterMergeController::acquire(
         UniqueId query_id, 
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
-    if (iter == _filter_controller_map.end()) {
+    auto iter = _filter_controller_map[shard].find(query_id_str);
+    if (iter == _filter_controller_map[shard].end()) {
         LOG(WARNING) << "not found entity, query-id:" << query_id_str;
         return Status::InvalidArgument("not found entity");
     }
-    *handle = _filter_controller_map[query_id_str].lock();
+    *handle = _filter_controller_map[shard][query_id_str].lock();
     if (*handle == nullptr) {
         return Status::InvalidArgument("entity is closed");
     }
     return Status::OK();
 }
 
-Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
-    _filter_controller_map.erase(queryId.to_string());
+Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    _filter_controller_map[shard].erase(query_id.to_string());
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 1471f664f7..1dbb75471e 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <unordered_map>
 
 #include "common/object_pool.h"
 #include "common/status.h"
@@ -162,16 +163,22 @@ public:
     // thread safe
     // remove a entity by query-id
     // remove_entity will be called automatically by entity when entity is 
destroyed
-    Status remove_entity(UniqueId queryId);
+    Status remove_entity(UniqueId query_id);
+
+    static const int kShardNum = 128;
 
 private:
-    std::mutex _controller_mutex;
+    uint32_t _get_controller_shard_idx(UniqueId& query_id) {
+        return (uint32_t)query_id.hi % kShardNum;
+    }
+
+    std::mutex _controller_mutex[kShardNum];
     // We store the weak pointer here.
     // When the external object is destroyed, we need to clear this record
     using FilterControllerMap =
-            std::map<std::string, 
std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+            std::unordered_map<std::string, 
std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
     // str(query-id) -> entity
-    FilterControllerMap _filter_controller_map;
+    FilterControllerMap _filter_controller_map[kShardNum];
 };
 
 using runtime_filter_merge_entity_closer = 
std::function<void(RuntimeFilterMergeControllerEntity*)>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to