This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b2caa40ed7b [Improvement](fragment) Use partitioned hash map to manage 
contexts (… (#46282)
b2caa40ed7b is described below

commit b2caa40ed7bd787a35a1a60b1d705eabfea6ce28
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Fri Jan 3 16:46:02 2025 +0800

    [Improvement](fragment) Use partitioned hash map to manage contexts (… 
(#46282)
    
    …#46235)
    
    Contexts in `fragment_mgr` are managed by a global map and accessed by
    multiple threads concurrently with a global lock. It introduced a
    obvious overhead. To solve it , this PR use a partitioned hash table to
    optimize the global lock.
---
 be/src/common/config.cpp        |   1 +
 be/src/common/config.h          |   2 +
 be/src/runtime/fragment_mgr.cpp | 508 +++++++++++++++++++++++-----------------
 be/src/runtime/fragment_mgr.h   |  60 +++--
 4 files changed, 342 insertions(+), 229 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 587862beffd..342638786ee 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -211,6 +211,7 @@ DEFINE_Int32(check_consistency_worker_count, "1");
 DEFINE_Int32(upload_worker_count, "1");
 // the count of thread to download
 DEFINE_Int32(download_worker_count, "1");
+DEFINE_Int32(num_query_ctx_map_partitions, "128");
 // the count of thread to make snapshot
 DEFINE_Int32(make_snapshot_worker_count, "5");
 // the count of thread to release snapshot
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bcad3ee29a2..5706a38b4b0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1350,6 +1350,8 @@ DECLARE_Int32(spill_io_thread_pool_queue_size);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
 
+DECLARE_Int32(num_query_ctx_map_partitions);
+
 DECLARE_mBool(enable_s3_rate_limiter);
 DECLARE_mInt64(s3_get_bucket_tokens);
 DECLARE_mInt64(s3_get_token_per_second);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 87c165222fc..9abdc18ee35 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -225,6 +225,89 @@ static std::map<int64_t, std::unordered_set<TUniqueId>> 
_get_all_running_queries
     return result;
 }
 
+inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
+    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
+    value = HashUtil::hash(&query_id.hi, 8, value);
+    return value % capacity;
+}
+
+inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
+    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
+    value = HashUtil::hash(&key.first.hi, 8, value);
+    return value % capacity;
+}
+
+template <typename Key, typename Value, typename ValueType>
+ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
+    _internal_map.resize(config::num_query_ctx_map_partitions);
+    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
+        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
+                            phmap::flat_hash_map<Key, Value>()};
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::shared_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        auto search = map.find(query_id);
+        if (search != map.end()) {
+            return search->second;
+        }
+        return std::shared_ptr<ValueType>(nullptr);
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
+        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, 
ApplyFunction&& function) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        auto search = map.find(query_id);
+        if (search != map.end()) {
+            query_ctx = search->second.lock();
+        }
+        if (!query_ctx) {
+            return function(map);
+        }
+        return Status::OK();
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        map.erase(query_id);
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
+                                                         
std::shared_ptr<ValueType> query_ctx) {
+    auto id = get_map_id(query_id, _internal_map.size());
+    {
+        std::unique_lock lock(*_internal_map[id].first);
+        auto& map = _internal_map[id].second;
+        map.insert({query_id, query_ctx});
+    }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::clear() {
+    for (auto& pair : _internal_map) {
+        std::unique_lock lock(*pair.first);
+        auto& map = pair.second;
+        map.clear();
+    }
+}
+
 FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env), _stop_background_threads_latch(1) {
     _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
@@ -253,14 +336,8 @@ void FragmentMgr::stop() {
     }
 
     // Only me can delete
-    {
-        std::unique_lock lock(_query_ctx_map_mutex);
-        _query_ctx_map.clear();
-    }
-    {
-        std::unique_lock lock(_pipeline_map_mutex);
-        _pipeline_map.clear();
-    }
+    _query_ctx_map.clear();
+    _pipeline_map.clear();
     _thread_pool->shutdown();
 }
 
@@ -640,17 +717,13 @@ void FragmentMgr::remove_pipeline_context(
     g_fragment_executing_count << -1;
     g_fragment_last_active_time.set_value(now);
 
-    std::unique_lock lock(_pipeline_map_mutex);
     _pipeline_map.erase({query_id, f_context->get_fragment_id()});
 }
 
 std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& 
query_id) {
-    std::shared_lock lock(_query_ctx_map_mutex);
-    auto search = _query_ctx_map.find(query_id);
-    if (search != _query_ctx_map.end()) {
-        if (auto q_ctx = search->second.lock()) {
-            return q_ctx;
-        }
+    auto val = _query_ctx_map.find(query_id);
+    if (auto q_ctx = val.lock()) {
+        return q_ctx;
     }
     return nullptr;
 }
@@ -677,67 +750,66 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
         }
     } else {
         if (!query_ctx) {
-            std::unique_lock lock(_query_ctx_map_mutex);
-            // Only one thread need create query ctx. other thread just get 
query_ctx in _query_ctx_map.
-            auto search = _query_ctx_map.find(query_id);
-            if (search != _query_ctx_map.end()) {
-                query_ctx = search->second.lock();
-            }
-
-            if (!query_ctx) {
-                WorkloadGroupPtr workload_group_ptr = nullptr;
-                std::string wg_info_str = "Workload Group not set";
-                if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
-                    uint64_t wg_id = params.workload_groups[0].id;
-                    workload_group_ptr = 
_exec_env->workload_group_mgr()->get_group(wg_id);
-                    if (workload_group_ptr != nullptr) {
-                        wg_info_str = workload_group_ptr->debug_string();
-                    } else {
-                        wg_info_str = "set wg but not find it in be";
-                    }
-                }
+            RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
+                    query_id, query_ctx,
+                    [&](phmap::flat_hash_map<TUniqueId, 
std::weak_ptr<QueryContext>>& map)
+                            -> Status {
+                        WorkloadGroupPtr workload_group_ptr = nullptr;
+                        std::string wg_info_str = "Workload Group not set";
+                        if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
+                            uint64_t wg_id = params.workload_groups[0].id;
+                            workload_group_ptr = 
_exec_env->workload_group_mgr()->get_group(wg_id);
+                            if (workload_group_ptr != nullptr) {
+                                wg_info_str = 
workload_group_ptr->debug_string();
+                            } else {
+                                wg_info_str = "set wg but not find it in be";
+                            }
+                        }
 
-                // First time a fragment of a query arrived. print logs.
-                LOG(INFO) << "query_id: " << print_id(query_id) << ", 
coord_addr: " << params.coord
-                          << ", total fragment num on current host: " << 
params.fragment_num_on_host
-                          << ", fe process uuid: " << 
params.query_options.fe_process_uuid
-                          << ", query type: " << 
params.query_options.query_type
-                          << ", report audit fe:" << params.current_connect_fe
-                          << ", use wg:" << wg_info_str;
-
-                // This may be a first fragment request of the query.
-                // Create the query fragments context.
-                query_ctx = QueryContext::create_shared(query_id, _exec_env, 
params.query_options,
-                                                        params.coord, 
pipeline, params.is_nereids,
-                                                        
params.current_connect_fe, query_source);
-                
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
-                RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
-                                                      &(query_ctx->desc_tbl)));
-                // set file scan range params
-                if (params.__isset.file_scan_params) {
-                    query_ctx->file_scan_range_params_map = 
params.file_scan_params;
-                }
+                        // First time a fragment of a query arrived. print 
logs.
+                        LOG(INFO) << "query_id: " << print_id(query_id)
+                                  << ", coord_addr: " << params.coord
+                                  << ", total fragment num on current host: "
+                                  << params.fragment_num_on_host
+                                  << ", fe process uuid: " << 
params.query_options.fe_process_uuid
+                                  << ", query type: " << 
params.query_options.query_type
+                                  << ", report audit fe:" << 
params.current_connect_fe
+                                  << ", use wg:" << wg_info_str;
+
+                        // This may be a first fragment request of the query.
+                        // Create the query fragments context.
+                        query_ctx = QueryContext::create_shared(
+                                query_id, _exec_env, params.query_options, 
params.coord, pipeline,
+                                params.is_nereids, params.current_connect_fe, 
query_source);
+                        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+                        RETURN_IF_ERROR(DescriptorTbl::create(
+                                &(query_ctx->obj_pool), params.desc_tbl, 
&(query_ctx->desc_tbl)));
+                        // set file scan range params
+                        if (params.__isset.file_scan_params) {
+                            query_ctx->file_scan_range_params_map = 
params.file_scan_params;
+                        }
 
-                query_ctx->query_globals = params.query_globals;
+                        query_ctx->query_globals = params.query_globals;
 
-                if (params.__isset.resource_info) {
-                    query_ctx->user = params.resource_info.user;
-                    query_ctx->group = params.resource_info.group;
-                    query_ctx->set_rsc_info = true;
-                }
+                        if (params.__isset.resource_info) {
+                            query_ctx->user = params.resource_info.user;
+                            query_ctx->group = params.resource_info.group;
+                            query_ctx->set_rsc_info = true;
+                        }
 
-                _set_scan_concurrency(params, query_ctx.get());
+                        _set_scan_concurrency(params, query_ctx.get());
 
-                if (workload_group_ptr != nullptr) {
-                    RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, 
query_ctx));
-                    query_ctx->set_workload_group(workload_group_ptr);
-                    
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
-                            print_id(query_id), workload_group_ptr->id());
-                }
-                // There is some logic in query ctx's dctor, we could not 
check if exists and delete the
-                // temp query ctx now. For example, the query id maybe removed 
from workload group's queryset.
-                _query_ctx_map.insert({query_id, query_ctx});
-            }
+                        if (workload_group_ptr != nullptr) {
+                            
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+                            query_ctx->set_workload_group(workload_group_ptr);
+                            
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+                                    print_id(query_id), 
workload_group_ptr->id());
+                        }
+                        // There is some logic in query ctx's dctor, we could 
not check if exists and delete the
+                        // temp query ctx now. For example, the query id maybe 
removed from workload group's queryset.
+                        map.insert({query_id, query_ctx});
+                        return Status::OK();
+                    }));
         }
     }
     return Status::OK();
@@ -754,24 +826,30 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
     {
         fmt::format_to(debug_string_buffer,
                        "{} pipeline fragment contexts are still running! 
duration_limit={}\n",
-                       _pipeline_map.size(), duration);
+                       _pipeline_map.num_items(), duration);
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
 
-        std::shared_lock lock(_pipeline_map_mutex);
-        for (auto& it : _pipeline_map) {
-            auto elapsed = it.second->elapsed_time() / 1000000000.0;
-            if (elapsed < duration) {
-                // Only display tasks which has been running for more than 
{duration} seconds.
-                continue;
+        _pipeline_map.apply([&](phmap::flat_hash_map<
+                                    std::pair<TUniqueId, int>,
+                                    
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+                                    -> Status {
+            for (auto& it : map) {
+                auto elapsed = it.second->elapsed_time() / 1000000000.0;
+                if (elapsed < duration) {
+                    // Only display tasks which has been running for more than 
{duration} seconds.
+                    continue;
+                }
+                auto timeout_second = it.second->timeout_second();
+                fmt::format_to(
+                        debug_string_buffer,
+                        "No.{} (elapse_second={}s, query_timeout_second={}s, 
is_timeout={}) : {}\n",
+                        i, elapsed, timeout_second, it.second->is_timeout(now),
+                        it.second->debug_string());
+                i++;
             }
-            auto timeout_second = it.second->timeout_second();
-            fmt::format_to(
-                    debug_string_buffer,
-                    "No.{} (elapse_second={}s, query_timeout_second={}s, 
is_timeout={}) : {}\n", i,
-                    elapsed, timeout_second, it.second->is_timeout(now), 
it.second->debug_string());
-            i++;
-        }
+            return Status::OK();
+        });
     }
     return fmt::to_string(debug_string_buffer);
 }
@@ -842,14 +920,13 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         g_fragment_last_active_time.set_value(now);
 
         // (query_id, fragment_id) is executed only on one BE, locks 
_pipeline_map.
-        std::unique_lock lock(_pipeline_map_mutex);
-        auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
-        if (iter != _pipeline_map.end()) {
+        auto res = _pipeline_map.find({params.query_id, params.fragment_id});
+        if (res != nullptr) {
             return Status::InternalError(
                     "exec_plan_fragment query_id({}) input duplicated 
fragment_id({})",
                     print_id(params.query_id), params.fragment_id);
         }
-        _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
+        _pipeline_map.insert({params.query_id, params.fragment_id}, context);
     }
 
     if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
@@ -890,10 +967,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
         }
     }
     query_ctx->cancel(reason);
-    {
-        std::unique_lock l(_query_ctx_map_mutex);
-        _query_ctx_map.erase(query_id);
-    }
+    _query_ctx_map.erase(query_id);
     LOG(INFO) << "Query " << print_id(query_id)
               << " is cancelled and removed. Reason: " << reason.to_string();
 }
@@ -926,119 +1000,130 @@ void FragmentMgr::cancel_worker() {
         }
 
         std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
-        {
-            std::shared_lock lock(_pipeline_map_mutex);
-            ctx.reserve(_pipeline_map.size());
-            for (auto& pipeline_itr : _pipeline_map) {
-                ctx.push_back(pipeline_itr.second);
-            }
-        }
+        _pipeline_map.apply(
+                [&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
+                                         
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+                        -> Status {
+                    ctx.reserve(ctx.size() + map.size());
+                    for (auto& pipeline_itr : map) {
+                        ctx.push_back(pipeline_itr.second);
+                    }
+                    return Status::OK();
+                });
         for (auto& c : ctx) {
             c->clear_finished_tasks();
         }
 
         {
-            {
-                // TODO: Now only the cancel worker do the GC the 
_query_ctx_map. each query must
-                // do erase the finish query unless in _query_ctx_map. Rethink 
the logic is ok
-                std::unique_lock lock(_query_ctx_map_mutex);
-                for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
-                    if (auto q_ctx = it->second.lock()) {
-                        if (q_ctx->is_timeout(now)) {
-                            LOG_WARNING("Query {} is timeout", 
print_id(it->first));
-                            queries_timeout.push_back(it->first);
+            _query_ctx_map.apply(
+                    [&](phmap::flat_hash_map<TUniqueId, 
std::weak_ptr<QueryContext>>& map)
+                            -> Status {
+                        for (auto it = map.begin(); it != map.end();) {
+                            if (auto q_ctx = it->second.lock()) {
+                                if (q_ctx->is_timeout(now)) {
+                                    LOG_WARNING("Query {} is timeout", 
print_id(it->first));
+                                    queries_timeout.push_back(it->first);
+                                }
+                                ++it;
+                            } else {
+                                it = map.erase(it);
+                            }
                         }
-                        ++it;
-                    } else {
-                        it = _query_ctx_map.erase(it);
-                    }
-                }
-            }
+                        return Status::OK();
+                    });
 
-            std::shared_lock lock(_query_ctx_map_mutex);
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
             // 2. If same process uuid, do not cancel
             // 3. If fe has zero process uuid, do not cancel
-            if (running_fes.empty() && !_query_ctx_map.empty()) {
+            if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
                 LOG_EVERY_N(WARNING, 10)
                         << "Could not find any running frontends, maybe we are 
upgrading or "
                            "starting? "
                         << "We will not cancel any outdated queries in this 
situation.";
             } else {
-                for (const auto& it : _query_ctx_map) {
-                    if (auto q_ctx = it.second.lock()) {
-                        const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
-
-                        if (fe_process_uuid == 0) {
-                            // zero means this query is from a older version 
fe or
-                            // this fe is starting
-                            continue;
-                        }
-
-                        // If the query is not running on the any frontends, 
cancel it.
-                        if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
-                            itr != running_queries_on_all_fes.end()) {
-                            // Query not found on this frontend, and the query 
arrives before the last check
-                            if (itr->second.find(it.first) == 
itr->second.end() &&
-                                // tv_nsec represents the number of 
nanoseconds that have elapsed since the time point stored in tv_sec.
-                                // tv_sec is enough, we do not need to check 
tv_nsec.
-                                q_ctx->get_query_arrival_timestamp().tv_sec <
-                                        
check_invalid_query_last_timestamp.tv_sec &&
-                                q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
-                                
queries_pipeline_task_leak.push_back(q_ctx->query_id());
-                                LOG_INFO(
-                                        "Query {}, type {} is not found on any 
frontends, maybe it "
-                                        "is leaked.",
-                                        print_id(q_ctx->query_id()),
-                                        toString(q_ctx->get_query_source()));
+                _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
+                                                              
std::weak_ptr<QueryContext>>& map)
+                                             -> Status {
+                    for (const auto& it : map) {
+                        if (auto q_ctx = it.second.lock()) {
+                            const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
+
+                            if (fe_process_uuid == 0) {
+                                // zero means this query is from a older 
version fe or
+                                // this fe is starting
                                 continue;
                             }
-                        }
 
-                        auto itr = running_fes.find(q_ctx->coord_addr);
-                        if (itr != running_fes.end()) {
-                            if (fe_process_uuid == 
itr->second.info.process_uuid ||
-                                itr->second.info.process_uuid == 0) {
-                                continue;
-                            } else {
-                                LOG_WARNING(
-                                        "Coordinator of query {} restarted, 
going to cancel it.",
-                                        print_id(q_ctx->query_id()));
+                            // If the query is not running on the any 
frontends, cancel it.
+                            if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
+                                itr != running_queries_on_all_fes.end()) {
+                                // Query not found on this frontend, and the 
query arrives before the last check
+                                if (itr->second.find(it.first) == 
itr->second.end() &&
+                                    // tv_nsec represents the number of 
nanoseconds that have elapsed since the time point stored in tv_sec.
+                                    // tv_sec is enough, we do not need to 
check tv_nsec.
+                                    
q_ctx->get_query_arrival_timestamp().tv_sec <
+                                            
check_invalid_query_last_timestamp.tv_sec &&
+                                    q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
+                                    
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+                                    LOG_INFO(
+                                            "Query {}, type {} is not found on 
any frontends, "
+                                            "maybe it "
+                                            "is leaked.",
+                                            print_id(q_ctx->query_id()),
+                                            
toString(q_ctx->get_query_source()));
+                                    continue;
+                                }
                             }
-                        } else {
-                            // In some rear cases, the rpc port of follower is 
not updated in time,
-                            // then the port of this follower will be zero, 
but acutally it is still running,
-                            // and be has already received the query from 
follower.
-                            // So we need to check if host is in running_fes.
-                            bool fe_host_is_standing = std::any_of(
-                                    running_fes.begin(), running_fes.end(),
-                                    [&q_ctx](const auto& fe) {
-                                        return fe.first.hostname == 
q_ctx->coord_addr.hostname &&
-                                               fe.first.port == 0;
-                                    });
-                            if (fe_host_is_standing) {
-                                LOG_WARNING(
-                                        "Coordinator {}:{} is not found, but 
its host is still "
-                                        "running with an unstable brpc port, 
not going to cancel "
-                                        "it.",
-                                        q_ctx->coord_addr.hostname, 
q_ctx->coord_addr.port,
-                                        print_id(q_ctx->query_id()));
-                                continue;
+
+                            auto itr = running_fes.find(q_ctx->coord_addr);
+                            if (itr != running_fes.end()) {
+                                if (fe_process_uuid == 
itr->second.info.process_uuid ||
+                                    itr->second.info.process_uuid == 0) {
+                                    continue;
+                                } else {
+                                    LOG_WARNING(
+                                            "Coordinator of query {} 
restarted, going to cancel "
+                                            "it.",
+                                            print_id(q_ctx->query_id()));
+                                }
                             } else {
-                                LOG_WARNING(
-                                        "Could not find target coordinator 
{}:{} of query {}, "
-                                        "going to "
-                                        "cancel it.",
-                                        q_ctx->coord_addr.hostname, 
q_ctx->coord_addr.port,
-                                        print_id(q_ctx->query_id()));
+                                // In some rear cases, the rpc port of 
follower is not updated in time,
+                                // then the port of this follower will be 
zero, but acutally it is still running,
+                                // and be has already received the query from 
follower.
+                                // So we need to check if host is in 
running_fes.
+                                bool fe_host_is_standing =
+                                        std::any_of(running_fes.begin(), 
running_fes.end(),
+                                                    [&q_ctx](const auto& fe) {
+                                                        return 
fe.first.hostname ==
+                                                                       
q_ctx->coord_addr.hostname &&
+                                                               fe.first.port 
== 0;
+                                                    });
+                                if (fe_host_is_standing) {
+                                    LOG_WARNING(
+                                            "Coordinator {}:{} is not found, 
but its host is still "
+                                            "running with an unstable brpc 
port, not going to "
+                                            "cancel "
+                                            "it.",
+                                            q_ctx->coord_addr.hostname, 
q_ctx->coord_addr.port,
+                                            print_id(q_ctx->query_id()));
+                                    continue;
+                                } else {
+                                    LOG_WARNING(
+                                            "Could not find target coordinator 
{}:{} of query {}, "
+                                            "going to "
+                                            "cancel it.",
+                                            q_ctx->coord_addr.hostname, 
q_ctx->coord_addr.port,
+                                            print_id(q_ctx->query_id()));
+                                }
                             }
                         }
+                        // Coordinator of this query has already dead or query 
context has been released.
+                        queries_lost_coordinator.push_back(it.first);
                     }
-                    // Coordinator of this query has already dead or query 
context has been released.
-                    queries_lost_coordinator.push_back(it.first);
-                }
+                    return Status::OK();
+                });
             }
         }
 
@@ -1169,7 +1254,6 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
 
 Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
                                    butil::IOBufAsZeroCopyInputStream* 
attach_data) {
-    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
     std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
@@ -1179,24 +1263,18 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
 
     const auto& fragment_ids = request->fragment_ids();
     {
-        std::shared_lock lock(_pipeline_map_mutex);
         for (auto fragment_id : fragment_ids) {
-            if (is_pipeline) {
-                auto iter = _pipeline_map.find(
-                        {UniqueId(request->query_id()).to_thrift(), 
fragment_id});
-                if (iter == _pipeline_map.end()) {
-                    continue;
-                }
-                pip_context = iter->second;
-
-                DCHECK(pip_context != nullptr);
-                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-                query_thread_context = 
{pip_context->get_query_ctx()->query_id(),
-                                        
pip_context->get_query_ctx()->query_mem_tracker,
-                                        
pip_context->get_query_ctx()->workload_group()};
-            } else {
-                return Status::InternalError("Non-pipeline is disabled!");
+            pip_context =
+                    
_pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
+            if (pip_context == nullptr) {
+                continue;
             }
+
+            DCHECK(pip_context != nullptr);
+            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+            query_thread_context = {pip_context->get_query_ctx()->query_id(),
+                                    
pip_context->get_query_ctx()->query_mem_tracker,
+                                    
pip_context->get_query_ctx()->workload_group()};
             break;
         }
     }
@@ -1294,22 +1372,24 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
 }
 
 void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
query_info_list) {
-    {
-        std::unique_lock lock(_query_ctx_map_mutex);
-        for (auto iter = _query_ctx_map.begin(); iter != 
_query_ctx_map.end();) {
-            if (auto q_ctx = iter->second.lock()) {
-                WorkloadQueryInfo workload_query_info;
-                workload_query_info.query_id = print_id(iter->first);
-                workload_query_info.tquery_id = iter->first;
-                workload_query_info.wg_id =
-                        q_ctx->workload_group() == nullptr ? -1 : 
q_ctx->workload_group()->id();
-                query_info_list->push_back(workload_query_info);
-                iter++;
-            } else {
-                iter = _query_ctx_map.erase(iter);
-            }
-        }
-    }
+    _query_ctx_map.apply(
+            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& 
map) -> Status {
+                for (auto iter = map.begin(); iter != map.end();) {
+                    if (auto q_ctx = iter->second.lock()) {
+                        WorkloadQueryInfo workload_query_info;
+                        workload_query_info.query_id = print_id(iter->first);
+                        workload_query_info.tquery_id = iter->first;
+                        workload_query_info.wg_id = q_ctx->workload_group() == 
nullptr
+                                                            ? -1
+                                                            : 
q_ctx->workload_group()->id();
+                        query_info_list->push_back(workload_query_info);
+                        iter++;
+                    } else {
+                        iter = map.erase(iter);
+                    }
+                }
+                return Status::OK();
+            });
 }
 
 Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0e7691647dd..fb01c899104 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -69,6 +69,46 @@ class WorkloadQueryInfo;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
+template <typename Key, typename Value, typename ValueType>
+class ConcurrentContextMap {
+public:
+    using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, 
Value>&)>;
+    ConcurrentContextMap();
+    Value find(const Key& query_id);
+    void insert(const Key& query_id, std::shared_ptr<ValueType>);
+    void clear();
+    void erase(const Key& query_id);
+    size_t num_items() const {
+        size_t n = 0;
+        for (auto& pair : _internal_map) {
+            std::shared_lock lock(*pair.first);
+            auto& map = pair.second;
+            n += map.size();
+        }
+        return n;
+    }
+    void apply(ApplyFunction&& function) {
+        for (auto& pair : _internal_map) {
+            // TODO: Now only the cancel worker do the GC the _query_ctx_map. 
each query must
+            // do erase the finish query unless in _query_ctx_map. Rethink the 
logic is ok
+            std::unique_lock lock(*pair.first);
+            static_cast<void>(function(pair.second));
+        }
+    }
+
+    Status apply_if_not_exists(const Key& query_id, 
std::shared_ptr<ValueType>& query_ctx,
+                               ApplyFunction&& function);
+
+private:
+    // The lock should only be used to protect the structures in fragment 
manager. Has to be
+    // used in a very small scope because it may dead lock. For example, if 
the _lock is used
+    // in prepare stage, the call path is  prepare --> expr prepare --> may 
call allocator
+    // when allocate failed, allocator may call query_is_cancelled, query is 
callced will also
+    // call _lock, so that there is dead lock.
+    std::vector<std::pair<std::unique_ptr<std::shared_mutex>, 
phmap::flat_hash_map<Key, Value>>>
+            _internal_map;
+};
+
 // This class used to manage all the fragment execute in this instance
 class FragmentMgr : public RestMonitorIface {
 public:
@@ -131,10 +171,7 @@ public:
 
     ThreadPool* get_thread_pool() { return _thread_pool.get(); }
 
-    int32_t running_query_num() {
-        std::shared_lock lock(_query_ctx_map_mutex);
-        return _query_ctx_map.size();
-    }
+    int32_t running_query_num() { return _query_ctx_map.num_items(); }
 
     std::string dump_pipeline_tasks(int64_t duration = 0);
     std::string dump_pipeline_tasks(TUniqueId& query_id);
@@ -164,21 +201,14 @@ private:
     // This is input params
     ExecEnv* _exec_env = nullptr;
 
-    // The lock protect the `_pipeline_map`
-    std::shared_mutex _pipeline_map_mutex;
     // (QueryID, FragmentID) -> PipelineFragmentContext
-    phmap::flat_hash_map<std::pair<TUniqueId, int>,
-                         std::shared_ptr<pipeline::PipelineFragmentContext>>
+    ConcurrentContextMap<std::pair<TUniqueId, int>,
+                         std::shared_ptr<pipeline::PipelineFragmentContext>,
+                         pipeline::PipelineFragmentContext>
             _pipeline_map;
 
-    // The lock should only be used to protect the structures in fragment 
manager. Has to be
-    // used in a very small scope because it may dead lock. For example, if 
the _lock is used
-    // in prepare stage, the call path is  prepare --> expr prepare --> may 
call allocator
-    // when allocate failed, allocator may call query_is_cancelled, query is 
callced will also
-    // call _lock, so that there is dead lock.
-    std::shared_mutex _query_ctx_map_mutex;
     // query id -> QueryContext
-    phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>> 
_query_ctx_map;
+    ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> 
_query_ctx_map;
     std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> 
_bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to