This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 91a9aafc01 [dev-1.1.2](cherry-pick ) Optimize the return msg of 
process memory limit exceed #12101
91a9aafc01 is described below

commit 91a9aafc01fbb714a98a4dfa569d9c4c057b4b99
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Fri Aug 26 15:54:32 2022 +0800

    [dev-1.1.2](cherry-pick ) Optimize the return msg of process memory limit 
exceed #12101
---
 be/src/common/config.h                        |  4 +-
 be/src/runtime/bufferpool/buffer_allocator.cc |  2 +-
 be/src/runtime/mem_pool.cpp                   |  1 +
 be/src/runtime/memory/chunk_allocator.cpp     | 14 ++++-
 be/src/runtime/memory/mem_tracker_limiter.cpp | 81 +++++++++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.h   | 32 ++++++-----
 6 files changed, 94 insertions(+), 40 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9533beff65..aa0b55bc0b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -402,6 +402,8 @@ CONF_Int32(min_buffer_size, "1024"); // 1024, The minimum 
read buffer size (in b
 // With 1024B through 8MB buffers, this is up to ~2GB of buffers.
 CONF_Int32(max_free_io_buffers, "128");
 
+// Whether to disable the memory cache pool,
+// including MemPool, ChunkAllocator, BufferPool, DiskIO free buffer.
 CONF_Bool(disable_mem_pools, "false");
 
 // Whether to allocate chunk using mmap. If you enable this, you'd better to
@@ -417,7 +419,7 @@ CONF_Bool(use_mmap_allocate_chunk, "false");
 // must larger than 0. and if larger than physical memory size, it will be set 
to physical memory size.
 // increase this variable can improve performance,
 // but will acquire more free memory which can not be used by other modules.
-CONF_mString(chunk_reserved_bytes_limit, "20%");
+CONF_mString(chunk_reserved_bytes_limit, "2147483648");
 // 1024, The minimum chunk allocator size (in bytes)
 CONF_Int32(min_chunk_reserved_bytes, "1024");
 
diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc 
b/be/src/runtime/bufferpool/buffer_allocator.cc
index a3bbe4c6c2..55a4de1076 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.cc
+++ b/be/src/runtime/bufferpool/buffer_allocator.cc
@@ -238,7 +238,7 @@ Status 
BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
     }
     if (UNLIKELY(len > system_bytes_limit_)) {
         err_stream << "Tried to allocate buffer of " << len << " bytes"
-                   << " > buffer pool limit of  " << MAX_BUFFER_BYTES << " 
bytes";
+                   << " > buffer pool limit of  " << system_bytes_limit_ << " 
bytes";
         return Status::InternalError(err_stream.str());
     }
 
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 8518347c18..f108f2266b 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -124,6 +124,7 @@ bool MemPool::find_chunk(size_t min_size, bool 
check_limits) {
     if (config::disable_mem_pools) {
         // Disable pooling by sizing the chunk to fit only this allocation.
         // Make sure the alignment guarantees are respected.
+        // This will generate too many small chunks.
         chunk_size = std::max<size_t>(min_size, alignof(max_align_t));
     } else {
         DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
diff --git a/be/src/runtime/memory/chunk_allocator.cpp 
b/be/src/runtime/memory/chunk_allocator.cpp
index 9d5cbfd592..6f22031006 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -33,6 +33,11 @@
 
 namespace doris {
 
+// <= MIN_CHUNK_SIZE, A large number of small chunks will waste extra storage 
and increase lock time.
+static constexpr size_t MIN_CHUNK_SIZE = 4096; // 4K
+// >= MAX_CHUNK_SIZE, Large chunks may not be used for a long time, wasting 
memory.
+static constexpr size_t MAX_CHUNK_SIZE = 64 * (1ULL << 20); // 64M
+
 ChunkAllocator* ChunkAllocator::_s_instance = nullptr;
 
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_local_core_alloc_count, 
MetricUnit::NOUNIT);
@@ -176,14 +181,19 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
 }
 
 void ChunkAllocator::free(const Chunk& chunk) {
-    if (chunk.core_id == -1) {
+    DCHECK(chunk.core_id != -1);
+    CHECK((chunk.size & (chunk.size - 1)) == 0);
+    if (config::disable_mem_pools) {
+        SystemAllocator::free(chunk.data, chunk.size);
         return;
     }
+
     int64_t old_reserved_bytes = _reserved_bytes;
     int64_t new_reserved_bytes = 0;
     do {
         new_reserved_bytes = old_reserved_bytes + chunk.size;
-        if (new_reserved_bytes > _reserve_bytes_limit) {
+        if (chunk.size <= MIN_CHUNK_SIZE || chunk.size >= MAX_CHUNK_SIZE ||
+            new_reserved_bytes > _reserve_bytes_limit) {
             int64_t cost_ns = 0;
             {
                 SCOPED_RAW_TIMER(&cost_ns);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 64f19873c2..4b268858b9 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -149,8 +149,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
 Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
     if (UNLIKELY(gc_memory(_limit - bytes))) {
         return Status::MemoryLimitExceeded(fmt::format(
-                "need_size={}, exceeded_tracker={}, limit={}, peak_used={}, 
current_used={}", bytes,
-                label(), _limit, _consumption->value(), 
_consumption->current_value()));
+                "failed_alloc_size={}Bytes, exceeded_tracker={}, limit={}B, 
peak_used={}B, "
+                "current_used={}B",
+                bytes, label(), _limit, _consumption->value(), 
_consumption->current_value()));
     }
     VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
                 << " consumption=" << _consumption->current_value() << " 
limit=" << _limit;
@@ -223,22 +224,25 @@ std::string MemTrackerLimiter::log_usage(int 
max_recursive_depth,
     return join(usage_strings, "\n");
 }
 
-Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
-    DCHECK(_limit != -1);
+Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) 
{
     std::string detail = fmt::format(
-            "{}, backend={} memory used={}, free memory left={}. If is query, 
can change the limit "
+            "{}, backend {} process memory used {}, process limit {}. If is 
query, can "
+            "change the limit "
             "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
             msg, BackendOptions::get_localhost(),
             PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
-            PrettyPrinter::print(MemInfo::mem_limit() - 
PerfCounters::get_vm_rss(), TUnit::BYTES));
-    Status status = Status::MemoryLimitExceeded(detail);
+            PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES));
+    return Status::MemoryLimitExceeded(detail);
+}
 
+void MemTrackerLimiter::print_log_usage(const std::string& msg) {
     // only print the tracker log_usage in be log.
+    std::string detail = msg;
     if (_print_log_usage) {
         if (_label == "Process") {
             // Dumping the process MemTracker is expensive. Limiting the 
recursive depth to two
             // levels limits the level of detail to a one-line summary for 
each query MemTracker.
-            detail += "\n" + 
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
+            detail += "\n" + log_usage(2);
         } else {
             detail += "\n" + log_usage();
         }
@@ -246,37 +250,66 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const 
std::string& msg) {
         LOG(WARNING) << detail;
         _print_log_usage = false;
     }
-    return status;
 }
 
 Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
                                              int64_t failed_allocation_size) {
     STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
     DCHECK(!_limited_ancestors.empty());
-    std::string detail = fmt::format("memory limit 
exceeded:<consumed_tracker={}, ", _label);
+    std::string detail = fmt::format("Memory limit exceeded, 
<consuming_tracker={}, ", _label);
     if (failed_allocation_size != 0)
-        detail += fmt::format("need_size={}, ",
+        detail += fmt::format("failed_alloc_size={}Bytes, ",
                               PrettyPrinter::print(failed_allocation_size, 
TUnit::BYTES));
-    MemTrackerLimiter* exceeded_tracker = this;
-    int64_t free_size = INT_MAX;
+    MemTrackerLimiter* exceeded_tracker = nullptr;
+    MemTrackerLimiter* max_consumption_tracker = nullptr;
+    int64_t free_size = INT64_MAX;
+    // Find the tracker that exceed limit and has the least free.
     for (const auto& tracker : _limited_ancestors) {
+        if (tracker->label() == "Process") continue;
         int64_t max_consumption = tracker->peak_consumption() > 
tracker->consumption()
                                           ? tracker->peak_consumption()
                                           : tracker->consumption();
-        if (tracker->has_limit() && tracker->limit() < max_consumption + 
failed_allocation_size) {
+        if (tracker->limit() < max_consumption + failed_allocation_size) {
             exceeded_tracker = tracker;
             break;
         }
-        if (tracker->has_limit() && tracker->limit() - max_consumption < 
free_size) {
+        if (tracker->limit() - max_consumption < free_size) {
             free_size = tracker->limit() - max_consumption;
-            exceeded_tracker = tracker;
+            max_consumption_tracker = tracker;
         }
     }
-    detail += fmt::format(
-            "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>, 
executing_msg:<{}>",
-            exceeded_tracker->label(), exceeded_tracker->limit(),
-            exceeded_tracker->peak_consumption(), 
exceeded_tracker->consumption(), msg);
-    return exceeded_tracker->mem_limit_exceeded_log(detail);
+
+    auto sys_exceed_st = check_sys_mem_info(failed_allocation_size);
+    MemTrackerLimiter* print_log_usage_tracker = nullptr;
+    if (exceeded_tracker != nullptr) {
+        detail += fmt::format(
+                "exceeded_tracker={}, limit={}B, peak_used={}B, 
current_used={B}>, "
+                "executing_msg:<{}>",
+                exceeded_tracker->label(), exceeded_tracker->limit(),
+                exceeded_tracker->peak_consumption(), 
exceeded_tracker->consumption(), msg);
+        print_log_usage_tracker = exceeded_tracker;
+    } else if (!sys_exceed_st) {
+        detail = fmt::format("Memory limit exceeded, {}, executing_msg:<{}>",
+                             sys_exceed_st.get_error_msg(), msg);
+    } else if (max_consumption_tracker != nullptr) {
+        // must after check_sys_mem_info false
+        detail += fmt::format(
+                "max_consumption_tracker={}, limit={}B, peak_used={}B, 
current_used={}B>, "
+                "executing_msg:<{}>",
+                max_consumption_tracker->label(), 
max_consumption_tracker->limit(),
+                max_consumption_tracker->peak_consumption(), 
max_consumption_tracker->consumption(),
+                msg);
+        print_log_usage_tracker = max_consumption_tracker;
+    } else {
+        // The limit of the current tracker and parents is less than 0, the 
consume will not fail,
+        // and the current process memory has no excess limit.
+        detail += fmt::format("unknown exceed reason, executing_msg:<{}>", 
msg);
+        print_log_usage_tracker = 
ExecEnv::GetInstance()->new_process_mem_tracker().get();
+    }
+    auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
+    if (print_log_usage_tracker != nullptr)
+        print_log_usage_tracker->print_log_usage(st.get_error_msg());
+    return st;
 }
 
 Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
@@ -284,9 +317,11 @@ Status MemTrackerLimiter::mem_limit_exceeded(const 
std::string& msg,
                                              Status failed_try_consume_st) {
     STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
     std::string detail =
-            fmt::format("memory limit exceeded:<consumed_tracker={}, {}>, 
executing_msg:<{}>",
+            fmt::format("memory limit exceeded:<consuming_tracker={}, {}>, 
executing_msg:<{}>",
                         _label, failed_try_consume_st.get_error_msg(), msg);
-    return failed_tracker->mem_limit_exceeded_log(detail);
+    auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
+    failed_tracker->print_log_usage(st.get_error_msg());
+    return st;
 }
 
 Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const 
std::string& msg,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 885acffc11..1933a15fc5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -70,9 +70,12 @@ public:
         // of the process malloc.
         // for fast, expect MemInfo::initialized() to be true.
         if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
-            return Status::MemoryLimitExceeded(fmt::format(
-                    "{}: TryConsume failed, bytes={} process whole 
consumption={}  mem limit={}",
-                    _label, bytes, MemInfo::current_mem(), 
MemInfo::mem_limit()));
+            auto st = Status::MemoryLimitExceeded(
+                    fmt::format("Memory limit exceeded, process memory used {} 
exceed limit {}, "
+                    "consuming_tracker={}, failed_alloc_size={}",
+                    PerfCounters::get_vm_rss(), MemInfo::mem_limit(), _label, 
bytes));
+            
ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg());
+            return st;
         }
         return Status::OK();
     }
@@ -195,7 +198,8 @@ private:
                                  const std::list<MemTrackerLimiter*>& trackers,
                                  int64_t* logged_consumption);
 
-    Status mem_limit_exceeded_log(const std::string& msg);
+    static Status mem_limit_exceeded_construct(const std::string& msg);
+    void print_log_usage(const std::string& msg);
 
 private:
     // Limit on memory consumption, in bytes. If limit_ == -1, there is no 
consumption limit. Used in log_usage。
@@ -278,7 +282,9 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) 
{
     // Walk the tracker tree top-down.
     for (i = _all_ancestors.size() - 1; i >= 0; --i) {
         MemTrackerLimiter* tracker = _all_ancestors[i];
-        if (tracker->limit() < 0) {
+        // Process tracker does not participate in the process memory limit, 
process tracker consumption is virtual memory,
+        // and there is a diff between the real physical memory value of the 
process. It is replaced by check_sys_mem_info.
+        if (tracker->limit() < 0 || _label == "Process") {
             tracker->_consumption->add(bytes); // No limit at this tracker.
         } else {
             // If TryConsume fails, we can try to GC, but we may need to try 
several times if
@@ -307,14 +313,14 @@ inline Status MemTrackerLimiter::check_limit(int64_t 
bytes) {
     RETURN_IF_ERROR(check_sys_mem_info(bytes));
     int i;
     // Walk the tracker tree top-down.
-    for (i = _all_ancestors.size() - 1; i >= 0; --i) {
-        MemTrackerLimiter* tracker = _all_ancestors[i];
-        if (tracker->limit() > 0) {
-            while (true) {
-                if (LIKELY(tracker->_consumption->current_value() + bytes < 
tracker->limit()))
-                    break;
-                RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
-            }
+    for (i = _limited_ancestors.size() - 1; i >= 0; --i) {
+        MemTrackerLimiter* tracker = _limited_ancestors[i];
+        // Process tracker does not participate in the process memory limit, 
process tracker consumption is virtual memory,
+        // and there is a diff between the real physical memory value of the 
process. It is replaced by check_sys_mem_info.
+        if (tracker->label() == "Process") continue;
+        while (true) {
+            if (LIKELY(tracker->_consumption->current_value() + bytes < 
tracker->limit())) break;
+            RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
         }
     }
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to