This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 95d27cf6b26 [Opt](exec) change transmit block to rw lock to opt 
performance #43223 (#43492)
95d27cf6b26 is described below

commit 95d27cf6b265cf38bc9d6fa30680ba30d422624c
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Nov 11 17:32:09 2024 +0800

    [Opt](exec) change transmit block to rw lock to opt performance #43223 
(#43492)
    
    cherry pick #43223
    
    Co-authored-by: HappenLee <happen...@selectdb.com>
---
 be/src/vec/runtime/vdata_stream_mgr.cpp | 8 ++++----
 be/src/vec/runtime/vdata_stream_mgr.h   | 3 ++-
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 4e48effb566..a620d6bde80 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -70,7 +70,7 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
                                                                  
fragment_instance_id, dest_node_id,
                                                                  num_senders, 
is_merging, profile));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
-    std::lock_guard<std::mutex> l(_lock);
+    std::unique_lock l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
     _receiver_map.insert(std::make_pair(hash_value, recvr));
     return recvr;
@@ -82,7 +82,7 @@ Status VDataStreamMgr::find_recvr(const TUniqueId& 
fragment_instance_id, PlanNod
              << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
     // Create lock guard and not own lock currently and will lock conditionally
-    std::unique_lock recvr_lock(_lock, std::defer_lock);
+    std::shared_lock recvr_lock(_lock, std::defer_lock);
     if (acquire_lock) {
         recvr_lock.lock();
     }
@@ -155,7 +155,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
                << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
     {
-        std::lock_guard<std::mutex> l(_lock);
+        std::unique_lock l(_lock);
         auto range = _receiver_map.equal_range(hash_value);
         while (range.first != range.second) {
             const std::shared_ptr<VDataStreamRecvr>& recvr = 
range.first->second;
@@ -189,7 +189,7 @@ void VDataStreamMgr::cancel(const TUniqueId& 
fragment_instance_id, Status exec_s
     VLOG_QUERY << "cancelling all streams for fragment=" << 
print_id(fragment_instance_id);
     std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
     {
-        std::lock_guard<std::mutex> l(_lock);
+        std::shared_lock l(_lock);
         FragmentStreamSet::iterator i =
                 
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
         while (i != _fragment_stream_set.end() && i->first == 
fragment_instance_id) {
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index 43605bb6abd..63cf118c4f5 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -23,6 +23,7 @@
 #include <memory>
 #include <mutex>
 #include <set>
+#include <shared_mutex>
 #include <unordered_map>
 #include <utility>
 
@@ -65,7 +66,7 @@ public:
     void cancel(const TUniqueId& fragment_instance_id, Status exec_status);
 
 private:
-    std::mutex _lock;
+    std::shared_mutex _lock;
     using StreamMap = std::unordered_multimap<uint32_t, 
std::shared_ptr<VDataStreamRecvr>>;
     StreamMap _receiver_map;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to