This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 91a9aafc01 [dev-1.1.2](cherry-pick ) Optimize the return msg of process memory limit exceed #12101 91a9aafc01 is described below commit 91a9aafc01fbb714a98a4dfa569d9c4c057b4b99 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Aug 26 15:54:32 2022 +0800 [dev-1.1.2](cherry-pick ) Optimize the return msg of process memory limit exceed #12101 --- be/src/common/config.h | 4 +- be/src/runtime/bufferpool/buffer_allocator.cc | 2 +- be/src/runtime/mem_pool.cpp | 1 + be/src/runtime/memory/chunk_allocator.cpp | 14 ++++- be/src/runtime/memory/mem_tracker_limiter.cpp | 81 +++++++++++++++++++-------- be/src/runtime/memory/mem_tracker_limiter.h | 32 ++++++----- 6 files changed, 94 insertions(+), 40 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 9533beff65..aa0b55bc0b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -402,6 +402,8 @@ CONF_Int32(min_buffer_size, "1024"); // 1024, The minimum read buffer size (in b // With 1024B through 8MB buffers, this is up to ~2GB of buffers. CONF_Int32(max_free_io_buffers, "128"); +// Whether to disable the memory cache pool, +// including MemPool, ChunkAllocator, BufferPool, DiskIO free buffer. CONF_Bool(disable_mem_pools, "false"); // Whether to allocate chunk using mmap. If you enable this, you'd better to @@ -417,7 +419,7 @@ CONF_Bool(use_mmap_allocate_chunk, "false"); // must larger than 0. and if larger than physical memory size, it will be set to physical memory size. // increase this variable can improve performance, // but will acquire more free memory which can not be used by other modules. -CONF_mString(chunk_reserved_bytes_limit, "20%"); +CONF_mString(chunk_reserved_bytes_limit, "2147483648"); // 1024, The minimum chunk allocator size (in bytes) CONF_Int32(min_chunk_reserved_bytes, "1024"); diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc b/be/src/runtime/bufferpool/buffer_allocator.cc index a3bbe4c6c2..55a4de1076 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.cc +++ b/be/src/runtime/bufferpool/buffer_allocator.cc @@ -238,7 +238,7 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle* } if (UNLIKELY(len > system_bytes_limit_)) { err_stream << "Tried to allocate buffer of " << len << " bytes" - << " > buffer pool limit of " << MAX_BUFFER_BYTES << " bytes"; + << " > buffer pool limit of " << system_bytes_limit_ << " bytes"; return Status::InternalError(err_stream.str()); } diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 8518347c18..f108f2266b 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -124,6 +124,7 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { if (config::disable_mem_pools) { // Disable pooling by sizing the chunk to fit only this allocation. // Make sure the alignment guarantees are respected. + // This will generate too many small chunks. chunk_size = std::max<size_t>(min_size, alignof(max_align_t)); } else { DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE); diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 9d5cbfd592..6f22031006 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -33,6 +33,11 @@ namespace doris { +// <= MIN_CHUNK_SIZE, A large number of small chunks will waste extra storage and increase lock time. +static constexpr size_t MIN_CHUNK_SIZE = 4096; // 4K +// >= MAX_CHUNK_SIZE, Large chunks may not be used for a long time, wasting memory. +static constexpr size_t MAX_CHUNK_SIZE = 64 * (1ULL << 20); // 64M + ChunkAllocator* ChunkAllocator::_s_instance = nullptr; DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_local_core_alloc_count, MetricUnit::NOUNIT); @@ -176,14 +181,19 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { } void ChunkAllocator::free(const Chunk& chunk) { - if (chunk.core_id == -1) { + DCHECK(chunk.core_id != -1); + CHECK((chunk.size & (chunk.size - 1)) == 0); + if (config::disable_mem_pools) { + SystemAllocator::free(chunk.data, chunk.size); return; } + int64_t old_reserved_bytes = _reserved_bytes; int64_t new_reserved_bytes = 0; do { new_reserved_bytes = old_reserved_bytes + chunk.size; - if (new_reserved_bytes > _reserve_bytes_limit) { + if (chunk.size <= MIN_CHUNK_SIZE || chunk.size >= MAX_CHUNK_SIZE || + new_reserved_bytes > _reserve_bytes_limit) { int64_t cost_ns = 0; { SCOPED_RAW_TIMER(&cost_ns); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 64f19873c2..4b268858b9 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -149,8 +149,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) { Status MemTrackerLimiter::try_gc_memory(int64_t bytes) { if (UNLIKELY(gc_memory(_limit - bytes))) { return Status::MemoryLimitExceeded(fmt::format( - "need_size={}, exceeded_tracker={}, limit={}, peak_used={}, current_used={}", bytes, - label(), _limit, _consumption->value(), _consumption->current_value())); + "failed_alloc_size={}Bytes, exceeded_tracker={}, limit={}B, peak_used={}B, " + "current_used={}B", + bytes, label(), _limit, _consumption->value(), _consumption->current_value())); } VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes << " consumption=" << _consumption->current_value() << " limit=" << _limit; @@ -223,22 +224,25 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, return join(usage_strings, "\n"); } -Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { - DCHECK(_limit != -1); +Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) { std::string detail = fmt::format( - "{}, backend={} memory used={}, free memory left={}. If is query, can change the limit " + "{}, backend {} process memory used {}, process limit {}. If is query, can " + "change the limit " "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.", msg, BackendOptions::get_localhost(), PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES), - PrettyPrinter::print(MemInfo::mem_limit() - PerfCounters::get_vm_rss(), TUnit::BYTES)); - Status status = Status::MemoryLimitExceeded(detail); + PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)); + return Status::MemoryLimitExceeded(detail); +} +void MemTrackerLimiter::print_log_usage(const std::string& msg) { // only print the tracker log_usage in be log. + std::string detail = msg; if (_print_log_usage) { if (_label == "Process") { // Dumping the process MemTracker is expensive. Limiting the recursive depth to two // levels limits the level of detail to a one-line summary for each query MemTracker. - detail += "\n" + ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2); + detail += "\n" + log_usage(2); } else { detail += "\n" + log_usage(); } @@ -246,37 +250,66 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { LOG(WARNING) << detail; _print_log_usage = false; } - return status; } Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); DCHECK(!_limited_ancestors.empty()); - std::string detail = fmt::format("memory limit exceeded:<consumed_tracker={}, ", _label); + std::string detail = fmt::format("Memory limit exceeded, <consuming_tracker={}, ", _label); if (failed_allocation_size != 0) - detail += fmt::format("need_size={}, ", + detail += fmt::format("failed_alloc_size={}Bytes, ", PrettyPrinter::print(failed_allocation_size, TUnit::BYTES)); - MemTrackerLimiter* exceeded_tracker = this; - int64_t free_size = INT_MAX; + MemTrackerLimiter* exceeded_tracker = nullptr; + MemTrackerLimiter* max_consumption_tracker = nullptr; + int64_t free_size = INT64_MAX; + // Find the tracker that exceed limit and has the least free. for (const auto& tracker : _limited_ancestors) { + if (tracker->label() == "Process") continue; int64_t max_consumption = tracker->peak_consumption() > tracker->consumption() ? tracker->peak_consumption() : tracker->consumption(); - if (tracker->has_limit() && tracker->limit() < max_consumption + failed_allocation_size) { + if (tracker->limit() < max_consumption + failed_allocation_size) { exceeded_tracker = tracker; break; } - if (tracker->has_limit() && tracker->limit() - max_consumption < free_size) { + if (tracker->limit() - max_consumption < free_size) { free_size = tracker->limit() - max_consumption; - exceeded_tracker = tracker; + max_consumption_tracker = tracker; } } - detail += fmt::format( - "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>, executing_msg:<{}>", - exceeded_tracker->label(), exceeded_tracker->limit(), - exceeded_tracker->peak_consumption(), exceeded_tracker->consumption(), msg); - return exceeded_tracker->mem_limit_exceeded_log(detail); + + auto sys_exceed_st = check_sys_mem_info(failed_allocation_size); + MemTrackerLimiter* print_log_usage_tracker = nullptr; + if (exceeded_tracker != nullptr) { + detail += fmt::format( + "exceeded_tracker={}, limit={}B, peak_used={}B, current_used={B}>, " + "executing_msg:<{}>", + exceeded_tracker->label(), exceeded_tracker->limit(), + exceeded_tracker->peak_consumption(), exceeded_tracker->consumption(), msg); + print_log_usage_tracker = exceeded_tracker; + } else if (!sys_exceed_st) { + detail = fmt::format("Memory limit exceeded, {}, executing_msg:<{}>", + sys_exceed_st.get_error_msg(), msg); + } else if (max_consumption_tracker != nullptr) { + // must after check_sys_mem_info false + detail += fmt::format( + "max_consumption_tracker={}, limit={}B, peak_used={}B, current_used={}B>, " + "executing_msg:<{}>", + max_consumption_tracker->label(), max_consumption_tracker->limit(), + max_consumption_tracker->peak_consumption(), max_consumption_tracker->consumption(), + msg); + print_log_usage_tracker = max_consumption_tracker; + } else { + // The limit of the current tracker and parents is less than 0, the consume will not fail, + // and the current process memory has no excess limit. + detail += fmt::format("unknown exceed reason, executing_msg:<{}>", msg); + print_log_usage_tracker = ExecEnv::GetInstance()->new_process_mem_tracker().get(); + } + auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail); + if (print_log_usage_tracker != nullptr) + print_log_usage_tracker->print_log_usage(st.get_error_msg()); + return st; } Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, @@ -284,9 +317,11 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, Status failed_try_consume_st) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); std::string detail = - fmt::format("memory limit exceeded:<consumed_tracker={}, {}>, executing_msg:<{}>", + fmt::format("memory limit exceeded:<consuming_tracker={}, {}>, executing_msg:<{}>", _label, failed_try_consume_st.get_error_msg(), msg); - return failed_tracker->mem_limit_exceeded_log(detail); + auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail); + failed_tracker->print_log_usage(st.get_error_msg()); + return st; } Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg, diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 885acffc11..1933a15fc5 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -70,9 +70,12 @@ public: // of the process malloc. // for fast, expect MemInfo::initialized() to be true. if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) { - return Status::MemoryLimitExceeded(fmt::format( - "{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}", - _label, bytes, MemInfo::current_mem(), MemInfo::mem_limit())); + auto st = Status::MemoryLimitExceeded( + fmt::format("Memory limit exceeded, process memory used {} exceed limit {}, " + "consuming_tracker={}, failed_alloc_size={}", + PerfCounters::get_vm_rss(), MemInfo::mem_limit(), _label, bytes)); + ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg()); + return st; } return Status::OK(); } @@ -195,7 +198,8 @@ private: const std::list<MemTrackerLimiter*>& trackers, int64_t* logged_consumption); - Status mem_limit_exceeded_log(const std::string& msg); + static Status mem_limit_exceeded_construct(const std::string& msg); + void print_log_usage(const std::string& msg); private: // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。 @@ -278,7 +282,9 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) { // Walk the tracker tree top-down. for (i = _all_ancestors.size() - 1; i >= 0; --i) { MemTrackerLimiter* tracker = _all_ancestors[i]; - if (tracker->limit() < 0) { + // Process tracker does not participate in the process memory limit, process tracker consumption is virtual memory, + // and there is a diff between the real physical memory value of the process. It is replaced by check_sys_mem_info. + if (tracker->limit() < 0 || _label == "Process") { tracker->_consumption->add(bytes); // No limit at this tracker. } else { // If TryConsume fails, we can try to GC, but we may need to try several times if @@ -307,14 +313,14 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { RETURN_IF_ERROR(check_sys_mem_info(bytes)); int i; // Walk the tracker tree top-down. - for (i = _all_ancestors.size() - 1; i >= 0; --i) { - MemTrackerLimiter* tracker = _all_ancestors[i]; - if (tracker->limit() > 0) { - while (true) { - if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit())) - break; - RETURN_IF_ERROR(tracker->try_gc_memory(bytes)); - } + for (i = _limited_ancestors.size() - 1; i >= 0; --i) { + MemTrackerLimiter* tracker = _limited_ancestors[i]; + // Process tracker does not participate in the process memory limit, process tracker consumption is virtual memory, + // and there is a diff between the real physical memory value of the process. It is replaced by check_sys_mem_info. + if (tracker->label() == "Process") continue; + while (true) { + if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit())) break; + RETURN_IF_ERROR(tracker->try_gc_memory(bytes)); } } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org