This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 42a34666a44 branch-3.0: [fix](memory) Fix `ThreadMemTrackerMgr::limiter_mem_tracker()` performance (#50525) 42a34666a44 is described below commit 42a34666a44f98ad322484ba84b8f9a97e968562 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Wed May 21 14:31:31 2025 +0800 branch-3.0: [fix](memory) Fix `ThreadMemTrackerMgr::limiter_mem_tracker()` performance (#50525) pick #50462 --- be/src/olap/page_cache.cpp | 3 ++- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 8 +++++--- be/src/runtime/memory/thread_mem_tracker_mgr.h | 16 +++++++++++++--- be/src/runtime/thread_context.cpp | 2 +- be/src/runtime/thread_context.h | 13 +++++++------ be/src/util/byte_buffer.h | 3 ++- 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1f0556f4642..b386da4d7c6 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -30,7 +30,8 @@ PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB if (use_cache) { _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); } else { - _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + _mem_tracker_by_allocator = + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); } { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 3b40426f6ef..dcdf4b1a4b4 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -51,12 +51,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); - _limiter_tracker->release_reserved(_untracked_mem); + _limiter_tracker_sptr->release_reserved(_untracked_mem); _reserved_mem = 0; _untracked_mem = 0; } _consumer_tracker_stack.clear(); - _limiter_tracker = mem_tracker; + _limiter_tracker_sptr = mem_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker( @@ -68,7 +69,8 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; _last_attach_snapshots_stack.pop_back(); - _limiter_tracker = old_mem_tracker; + _limiter_tracker_sptr = old_mem_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); } void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index db3b32a6298..e3a1409ddfc 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -93,11 +93,19 @@ public: void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = new_val; } - std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { + MemTrackerLimiter* limiter_mem_tracker() { CHECK(init()); return _limiter_tracker; } + // Prefer use `limiter_mem_tracker`, which is faster than `limiter_mem_tracker_sptr`. + // when multiple threads hold the same `std::shared_ptr` at the same time, + // modifying the `std::shared_ptr` reference count will be expensive when there is high concurrency. + std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker_sptr() { + CHECK(init()); + return _limiter_tracker_sptr; + } + void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } [[nodiscard]] bool wait_gc() const { return _wait_gc; } @@ -141,7 +149,8 @@ private: // A thread of query/load will only wait once during execution. bool _wait_gc = false; - std::shared_ptr<MemTrackerLimiter> _limiter_tracker; + std::shared_ptr<MemTrackerLimiter> _limiter_tracker_sptr {nullptr}; + MemTrackerLimiter* _limiter_tracker {nullptr}; std::vector<MemTracker*> _consumer_tracker_stack; std::weak_ptr<WorkloadGroup> _wg_wptr; @@ -156,7 +165,8 @@ inline bool ThreadMemTrackerMgr::init() { // 2. ExecEnv not initialized when thread start, initialized in limiter_mem_tracker(). if (_init) return true; if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { - _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker_sptr = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker = _limiter_tracker_sptr.get(); _wait_gc = true; _init = true; return true; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index c89f532e592..2aee48819c6 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -28,7 +28,7 @@ class MemTracker; QueryThreadContext ThreadContext::query_thread_context() { DCHECK(doris::pthread_context_ptr_init); ORPHAN_TRACKER_CHECK(); - return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr}; + return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker_sptr(), _wg_wptr}; } void AttachTask::init(const QueryThreadContext& query_thread_context) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e0a44af69c1..30871399eed 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -235,7 +235,7 @@ public: // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; - [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const { + [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { return thread_mem_tracker_mgr->limiter_mem_tracker(); } @@ -402,7 +402,8 @@ public: #ifndef BE_TEST ORPHAN_TRACKER_CHECK(); query_id = doris::thread_context()->task_id(); - query_mem_tracker = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + query_mem_tracker = + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); wg_wptr = doris::thread_context()->workload_group(); #else query_id = TUniqueId(); @@ -468,8 +469,8 @@ public: const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); } } @@ -480,8 +481,8 @@ public: query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); if (query_thread_context.query_mem_tracker != - thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( query_thread_context.query_mem_tracker); } diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 17764b9e4f6..474a50339dc 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -73,7 +73,8 @@ private: : pos(0), limit(capacity_), capacity(capacity_), - mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + mem_tracker_( + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org