This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new ff3b091964 [fix](memtracker) Improve performance of tracking real physical memory of PodArray #12021 (#12055) ff3b091964 is described below commit ff3b0919646fa2cc80441917ab6f34ad9d3c7a6e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Aug 25 10:13:46 2022 +0800 [fix](memtracker) Improve performance of tracking real physical memory of PodArray #12021 (#12055) --- be/src/runtime/mem_pool.h | 2 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 9 ++++---- be/src/runtime/memory/mem_tracker_limiter.h | 30 ++++++++++++------------- be/src/runtime/memory/mem_tracker_task_pool.cpp | 4 +--- be/src/runtime/memory/thread_mem_tracker_mgr.h | 10 --------- be/src/runtime/thread_context.h | 8 ++++--- be/src/vec/common/pod_array.h | 2 +- 7 files changed, 28 insertions(+), 37 deletions(-) diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 1f8f5aaf67..128916007b 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -208,7 +208,7 @@ private: bool check_integrity(bool check_current_chunk_empty); void reset_peak() { - if (total_allocated_bytes_ - peak_allocated_bytes_ > 1024) { + if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) { THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, ExecEnv::GetInstance()->new_process_mem_tracker().get()); peak_allocated_bytes_ = total_allocated_bytes_; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index b64d2899cb..64f19873c2 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -249,20 +249,21 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { return status; } -Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) { +Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, + int64_t failed_allocation_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); DCHECK(!_limited_ancestors.empty()); std::string detail = fmt::format("memory limit exceeded:<consumed_tracker={}, ", _label); - if (failed_consume_size != 0) + if (failed_allocation_size != 0) detail += fmt::format("need_size={}, ", - PrettyPrinter::print(failed_consume_size, TUnit::BYTES)); + PrettyPrinter::print(failed_allocation_size, TUnit::BYTES)); MemTrackerLimiter* exceeded_tracker = this; int64_t free_size = INT_MAX; for (const auto& tracker : _limited_ancestors) { int64_t max_consumption = tracker->peak_consumption() > tracker->consumption() ? tracker->peak_consumption() : tracker->consumption(); - if (tracker->has_limit() && tracker->limit() < max_consumption + failed_consume_size) { + if (tracker->has_limit() && tracker->limit() < max_consumption + failed_allocation_size) { exceeded_tracker = tracker; break; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 767e4fdafb..885acffc11 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -128,7 +128,13 @@ public: // It is used for revise mem tracker consumption. // If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate. // But the consumption value of the process mem tracker is not affecte - void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker); + void cache_consume_local(int64_t bytes); + + // Will not change the value of process_mem_tracker, even though mem_tracker == process_mem_tracker. + void transfer_to(int64_t size, MemTrackerLimiter* dst) { + cache_consume_local(-size); + dst->cache_consume_local(size); + } void enable_print_log_usage() { _print_log_usage = true; } @@ -145,11 +151,11 @@ public: // If 'failed_allocation_size' is greater than zero, logs the allocation size. If // 'failed_allocation_size' is zero, nothing about the allocation size is logged. // If 'state' is non-nullptr, logs the error to 'state'. - Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size = 0); + Status mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0); Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker, Status failed_try_consume_st); Status mem_limit_exceeded(RuntimeState* state, const std::string& msg, - int64_t failed_consume_size = 0); + int64_t failed_allocation_size = 0); std::string debug_string() { std::stringstream msg; @@ -181,7 +187,6 @@ private: // the current value is returned and set to 0. // Thread safety. int64_t add_untracked_mem(int64_t bytes); - void consume_cache(int64_t bytes); // Log consumption of all the trackers provided. Returns the sum of consumption in // 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels @@ -252,19 +257,14 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { return 0; } -inline void MemTrackerLimiter::consume_cache(int64_t bytes) { +inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) { + if (bytes == 0) return; int64_t consume_bytes = add_untracked_mem(bytes); if (consume_bytes != 0) { - consume(consume_bytes); - } -} - -inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) { - DCHECK(end_tracker); - if (bytes == 0) return; - for (auto& tracker : _all_ancestors) { - if (tracker->label() == end_tracker->label()) return; - tracker->_consumption->add(bytes); + for (auto& tracker : _all_ancestors) { + if (tracker->label() == "Process") return; + tracker->_consumption->add(bytes); + } } } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index c080261056..a08d876370 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -93,9 +93,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, // the negative number of the current value of consume. - it->second->parent()->consume_local( - -it->second->consumption(), - ExecEnv::GetInstance()->new_process_mem_tracker().get()); + it->second->parent()->cache_consume_local(-it->second->consumption()); LOG(INFO) << fmt::format( "Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}", it->first, it->second->limit(), it->second->peak_consumption()); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 449a1e070c..0cd371598a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -75,16 +75,6 @@ public: // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void consume(int64_t size); - // Will not change the value of process_mem_tracker, even though mem_tracker == process_mem_tracker. - void transfer_to(int64_t size, MemTrackerLimiter* mem_tracker) { - consume(-size); - mem_tracker->consume_cache(size); - } - void transfer_from(int64_t size, MemTrackerLimiter* mem_tracker) { - mem_tracker->consume_cache(-size); - consume(size); - } - template <bool CheckLimit> void flush_untracked_mem(); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 2f752516c1..5f524996ae 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -256,10 +256,12 @@ public: doris::thread_context()->_thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(-size) -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, tracker) +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size, \ + tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, tracker) + tracker->transfer_to( \ + size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get()) #define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ return doris::thread_context() \ ->_thread_mem_tracker_mgr->limiter_mem_tracker() \ diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 9b80473032..5d2961ccea 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -113,7 +113,7 @@ protected: } inline void reset_peak() { - if (UNLIKELY(c_end - c_end_peak > 1024)) { + if (UNLIKELY(c_end - c_end_peak > 4096)) { THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, ExecEnv::GetInstance()->new_process_mem_tracker().get()); c_end_peak = c_end; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org