This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 95d27cf6b26 [Opt](exec) change transmit block to rw lock to opt performance #43223 (#43492) 95d27cf6b26 is described below commit 95d27cf6b265cf38bc9d6fa30680ba30d422624c Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Nov 11 17:32:09 2024 +0800 [Opt](exec) change transmit block to rw lock to opt performance #43223 (#43492) cherry pick #43223 Co-authored-by: HappenLee <happen...@selectdb.com> --- be/src/vec/runtime/vdata_stream_mgr.cpp | 8 ++++---- be/src/vec/runtime/vdata_stream_mgr.h | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 4e48effb566..a620d6bde80 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -70,7 +70,7 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( fragment_instance_id, dest_node_id, num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); - std::lock_guard<std::mutex> l(_lock); + std::unique_lock l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); _receiver_map.insert(std::make_pair(hash_value, recvr)); return recvr; @@ -82,7 +82,7 @@ Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNod << ", node=" << node_id; size_t hash_value = get_hash_value(fragment_instance_id, node_id); // Create lock guard and not own lock currently and will lock conditionally - std::unique_lock recvr_lock(_lock, std::defer_lock); + std::shared_lock recvr_lock(_lock, std::defer_lock); if (acquire_lock) { recvr_lock.lock(); } @@ -155,7 +155,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P << ", node=" << node_id; size_t hash_value = get_hash_value(fragment_instance_id, node_id); { - std::lock_guard<std::mutex> l(_lock); + std::unique_lock l(_lock); auto range = _receiver_map.equal_range(hash_value); while (range.first != range.second) { const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second; @@ -189,7 +189,7 @@ void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_s VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id); std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs; { - std::lock_guard<std::mutex> l(_lock); + std::shared_lock l(_lock); FragmentStreamSet::iterator i = _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0)); while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) { diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 43605bb6abd..63cf118c4f5 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -23,6 +23,7 @@ #include <memory> #include <mutex> #include <set> +#include <shared_mutex> #include <unordered_map> #include <utility> @@ -65,7 +66,7 @@ public: void cancel(const TUniqueId& fragment_instance_id, Status exec_status); private: - std::mutex _lock; + std::shared_mutex _lock; using StreamMap = std::unordered_multimap<uint32_t, std::shared_ptr<VDataStreamRecvr>>; StreamMap _receiver_map; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org