This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f58e18a34b3 [opt](memtracker) Optimize  `PODArray` memory tracking 
accuracy (#50549)
f58e18a34b3 is described below

commit f58e18a34b3298a6311c3f9fb40d7c882a6ac2eb
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Sat May 10 17:50:30 2025 +0800

    [opt](memtracker) Optimize  `PODArray` memory tracking accuracy (#50549)
    
    ### What problem does this PR solve?
    
    Based on #11740
    
    The memory size tracked in Allocator is virtual memory. If the allocated
    memory is not fully used, the tracked virtual memory may be larger than
    the actual physical memory.
    
    `PODArray` does not memset 0 when allocated memory, the memory blocks
    allocated by `PODArray` (such as VOlapScanNode::_free_blocks) are
    usually used for memory reuse and will not be fully used.
    
    After this PR, the unused memory in `PODArray` is recorded in reserve to
    indicate that this part of the memory is not actually used.
---
 be/src/olap/memtable_flush_executor.cpp            |   3 +-
 be/src/runtime/memory/global_memory_arbitrator.cpp |  10 +
 be/src/runtime/memory/global_memory_arbitrator.h   |   1 +
 be/src/runtime/memory/mem_counter.h                |   5 -
 be/src/runtime/memory/mem_tracker.h                |   4 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      |   7 +-
 be/src/runtime/memory/mem_tracker_limiter.h        |   2 +-
 be/src/runtime/memory/memory_reclamation.cpp       |   6 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  77 +++--
 be/src/runtime/query_context.cpp                   |   6 +-
 be/src/runtime/workload_group/workload_group.cpp   |   8 +-
 .../workload_group/workload_group_manager.cpp      |   4 +-
 .../runtime/workload_management/memory_context.cpp |   6 +-
 .../workload_management/task_controller.cpp        |   6 +-
 be/src/util/pretty_printer.h                       |   3 +-
 be/src/vec/common/allocator.cpp                    | 334 +++++++++++++++------
 be/src/vec/common/allocator.h                      | 204 ++++---------
 be/src/vec/common/allocator_fwd.h                  |   2 +
 be/src/vec/common/pod_array.h                      | 113 ++++++-
 be/src/vec/common/pod_array_fwd.h                  |  13 +-
 .../runtime/memory/thread_mem_tracker_mgr_test.cpp |   2 +-
 .../testutil/mock/mock_thread_mem_tracker_mgr.h    |   5 +-
 be/test/vec/common/pod_array_test.cpp              | 291 +++++++++++++++++-
 23 files changed, 798 insertions(+), 314 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 44df6cd46f1..938b12ef5a9 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -149,7 +149,8 @@ Status FlushToken::_try_reserve_memory(const 
std::shared_ptr<ResourceContext>& r
     int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
     do {
         // only try to reserve process memory
-        st = thread_context->thread_mem_tracker_mgr->try_reserve(size, true);
+        st = thread_context->thread_mem_tracker_mgr->try_reserve(
+                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
         if (st.ok()) {
             memtable_flush_executor->inc_flushing_task();
             break;
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp 
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 33d2941f884..5aed0721fff 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -96,6 +96,16 @@ void GlobalMemoryArbitrator::refresh_memory_bvar() {
                        
memory_arbitrator_refresh_interval_growth_bytes.get_value();
 }
 
+bool GlobalMemoryArbitrator::reserve_process_memory(int64_t bytes) {
+    int64_t old_reserved_mem = 
_process_reserved_memory.load(std::memory_order_relaxed);
+    int64_t new_reserved_mem = 0;
+    do {
+        new_reserved_mem = old_reserved_mem + bytes;
+    } while (!_process_reserved_memory.compare_exchange_weak(old_reserved_mem, 
new_reserved_mem,
+                                                             
std::memory_order_relaxed));
+    return true;
+}
+
 bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
     if (sys_mem_available() - bytes < 
MemInfo::sys_mem_available_warning_water_mark()) {
         
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index 7ac6fc4795e..7ac98b1c1f2 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -90,6 +90,7 @@ public:
         return msg;
     }
 
+    static bool reserve_process_memory(int64_t bytes);
     static bool try_reserve_process_memory(int64_t bytes);
     static void shrink_process_reserved(int64_t bytes);
 
diff --git a/be/src/runtime/memory/mem_counter.h 
b/be/src/runtime/memory/mem_counter.h
index 029bf01bb70..335c91f18ee 100644
--- a/be/src/runtime/memory/mem_counter.h
+++ b/be/src/runtime/memory/mem_counter.h
@@ -83,11 +83,6 @@ public:
     int64_t current_value() const { return 
_current_value.load(std::memory_order_relaxed); }
     int64_t peak_value() const { return 
_peak_value.load(std::memory_order_relaxed); }
 
-    static std::string print_bytes(int64_t bytes) {
-        return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
-                          : "-" + PrettyPrinter::print(std::abs(bytes), 
TUnit::BYTES);
-    }
-
 private:
     std::atomic<int64_t> _current_value {0};
     std::atomic<int64_t> _peak_value {0};
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index deb27ee2639..382efa82de0 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -47,8 +47,8 @@ public:
     const std::string& label() const { return _label; }
     std::string log_usage() const {
         return fmt::format("MemTracker name={}, Used={}({} B), Peak={}({} B)", 
_label,
-                           MemCounter::print_bytes(consumption()), 
consumption(),
-                           MemCounter::print_bytes(peak_consumption()), 
peak_consumption());
+                           PrettyPrinter::print_bytes(consumption()), 
consumption(),
+                           PrettyPrinter::print_bytes(peak_consumption()), 
peak_consumption());
     }
 
 private:
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 16f8ffb01f3..c564aa3691a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -360,9 +360,10 @@ std::string 
MemTrackerLimiter::tracker_limit_exceeded_str() {
     std::string err_msg = fmt::format(
             "memory tracker limit exceeded, tracker label:{}, type:{}, limit "
             "{}, peak used {}, current used {}. backend {}, {}.",
-            label(), type_string(_type), MemCounter::print_bytes(limit()),
-            MemCounter::print_bytes(peak_consumption()), 
MemCounter::print_bytes(consumption()),
-            BackendOptions::get_localhost(), 
GlobalMemoryArbitrator::process_memory_used_str());
+            label(), type_string(_type), PrettyPrinter::print_bytes(limit()),
+            PrettyPrinter::print_bytes(peak_consumption()),
+            PrettyPrinter::print_bytes(consumption()), 
BackendOptions::get_localhost(),
+            GlobalMemoryArbitrator::process_memory_used_str());
     if (_type == Type::QUERY || _type == Type::LOAD) {
         err_msg += fmt::format(
                 " exec node:<{}>, can `set exec_mem_limit` to change limit, 
details see be.INFO.",
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 2051143da53..7dd66eafe1b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -311,7 +311,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) 
{
     // it will not take effect.
     if (consumption() + bytes > _limit) {
         return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, 
{}",
-                                                       
MemCounter::print_bytes(bytes),
+                                                       
PrettyPrinter::print_bytes(bytes),
                                                        
tracker_limit_exceeded_str()));
     }
     return Status::OK();
diff --git a/be/src/runtime/memory/memory_reclamation.cpp 
b/be/src/runtime/memory/memory_reclamation.cpp
index 162ae28cdcf..5f1b0d55e17 100644
--- a/be/src/runtime/memory/memory_reclamation.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -183,7 +183,7 @@ bool MemoryReclamation::revoke_process_memory(const 
std::string& revoke_reason)
     LOG(INFO) << fmt::format(
             "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, 
need free size: {}.",
             GlobalMemoryArbitrator::process_mem_log_str(),
-            MemCounter::print_bytes(MemInfo::process_full_gc_size()));
+            PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()));
     Defer defer {[&]() {
         std::stringstream ss;
         profile->pretty_print(&ss);
@@ -191,8 +191,8 @@ bool MemoryReclamation::revoke_process_memory(const 
std::string& revoke_reason)
                 "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, 
need free size: {}, "
                 "free Memory {}. cost(us): {}, details: {}",
                 GlobalMemoryArbitrator::process_mem_log_str(),
-                MemCounter::print_bytes(MemInfo::process_full_gc_size()),
-                MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 
1000, ss.str());
+                PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()),
+                PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 
1000, ss.str());
     }};
 
     // step1: start canceling from the query with the largest memory usage 
until the memory of process_full_gc_size is freed.
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index fdafd172878..44cbfbc0597 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -82,8 +82,21 @@ public:
     void consume(int64_t size);
     void flush_untracked_mem();
 
+    enum class TryReserveChecker {
+        NONE = 0,
+        CHECK_TASK = 1,
+        CHECK_WORKLOAD_GROUP = 2,
+        CHECK_TASK_AND_WORKLOAD_GROUP = 3,
+        CHECK_PROCESS = 4,
+        CHECK_TASK_AND_PROCESS = 5,
+        CHECK_WORKLOAD_GROUP_AND_PROCESS = 6,
+        CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS = 7,
+    };
+
     // if only_check_process_memory == true, still reserve query, wg, process 
memory, only check process memory.
-    MOCK_FUNCTION doris::Status try_reserve(int64_t size, bool 
only_check_process_memory = false);
+    MOCK_FUNCTION doris::Status try_reserve(
+            int64_t size, TryReserveChecker checker =
+                                  
TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS);
 
     void shrink_reserved();
 
@@ -290,8 +303,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     _stop_consume = false;
 }
 
-inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
-                                                      bool 
only_check_process_memory) {
+inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, 
TryReserveChecker checker) {
     DCHECK(size >= 0);
     CHECK(init());
     DCHECK(_limiter_tracker);
@@ -301,48 +313,61 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size,
     flush_untracked_mem();
     auto wg_ptr = _wg_wptr.lock();
 
-    if (only_check_process_memory) {
-        _limiter_tracker->reserve(size);
-        if (wg_ptr) {
-            wg_ptr->add_wg_refresh_interval_memory_growth(size);
-        }
-    } else {
+    bool task_limit_checker = static_cast<int>(checker) & 1;
+    bool workload_group_limit_checker = static_cast<int>(checker) & 2;
+    bool process_limit_checker = static_cast<int>(checker) & 4;
+
+    if (task_limit_checker) {
         if (!_limiter_tracker->try_reserve(size)) {
             auto err_msg = fmt::format(
                     "reserve memory failed, size: {}, because query memory 
exceeded, memory "
-                    "tracker "
-                    "consumption: {}, limit: {}",
-                    PrettyPrinter::print(size, TUnit::BYTES),
-                    PrettyPrinter::print(_limiter_tracker->consumption(), 
TUnit::BYTES),
-                    PrettyPrinter::print(_limiter_tracker->limit(), 
TUnit::BYTES));
+                    "tracker: {}, "
+                    "consumption: {}, limit: {}, peak: {}",
+                    PrettyPrinter::print_bytes(size), 
_limiter_tracker->label(),
+                    
PrettyPrinter::print_bytes(_limiter_tracker->consumption()),
+                    PrettyPrinter::print_bytes(_limiter_tracker->limit()),
+                    
PrettyPrinter::print_bytes(_limiter_tracker->peak_consumption()));
             return 
doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
         }
-        if (wg_ptr) {
+    } else {
+        _limiter_tracker->reserve(size);
+    }
+
+    if (wg_ptr) {
+        if (workload_group_limit_checker) {
             if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
                 auto err_msg = fmt::format(
                         "reserve memory failed, size: {}, because workload 
group memory exceeded, "
                         "workload group: {}",
-                        PrettyPrinter::print(size, TUnit::BYTES), 
wg_ptr->memory_debug_string());
+                        PrettyPrinter::print_bytes(size), 
wg_ptr->memory_debug_string());
                 _limiter_tracker->release(size);         // rollback
                 _limiter_tracker->shrink_reserved(size); // rollback
                 return 
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
             }
+        } else {
+            wg_ptr->add_wg_refresh_interval_memory_growth(size);
         }
     }
 
-    if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
-        auto err_msg =
-                fmt::format("reserve memory failed, size: {}, because proccess 
memory exceeded, {}",
-                            PrettyPrinter::print(size, TUnit::BYTES),
-                            GlobalMemoryArbitrator::process_mem_log_str());
-        _limiter_tracker->release(size);         // rollback
-        _limiter_tracker->shrink_reserved(size); // rollback
-        if (wg_ptr) {
-            wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
+    if (process_limit_checker) {
+        if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
+            auto err_msg = fmt::format(
+                    "reserve memory failed, size: {}, because proccess memory 
exceeded, {}",
+                    PrettyPrinter::print_bytes(size),
+                    GlobalMemoryArbitrator::process_mem_log_str());
+            _limiter_tracker->release(size);         // rollback
+            _limiter_tracker->shrink_reserved(size); // rollback
+            if (wg_ptr) {
+                wg_ptr->sub_wg_refresh_interval_memory_growth(size); // 
rollback
+            }
+            return 
doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg);
         }
-        return 
doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg);
+    } else {
+        doris::GlobalMemoryArbitrator::reserve_process_memory(size);
     }
+
     _reserved_mem += size;
+    DCHECK(_reserved_mem >= 0);
     return doris::Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2853f2356ca..82124e75552 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -202,9 +202,9 @@ QueryContext::~QueryContext() {
         mem_tracker_msg = fmt::format(
                 "deregister query/load memory tracker, queryId={}, Limit={}, 
CurrUsed={}, "
                 "PeakUsed={}",
-                print_id(_query_id), 
MemCounter::print_bytes(query_mem_tracker()->limit()),
-                MemCounter::print_bytes(query_mem_tracker()->consumption()),
-                
MemCounter::print_bytes(query_mem_tracker()->peak_consumption()));
+                print_id(_query_id), 
PrettyPrinter::print_bytes(query_mem_tracker()->limit()),
+                PrettyPrinter::print_bytes(query_mem_tracker()->consumption()),
+                
PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption()));
     }
     [[maybe_unused]] uint64_t group_id = 0;
     if (workload_group()) {
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 794a9087d00..83690a79eb0 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -257,18 +257,18 @@ int64_t WorkloadGroup::revoke_memory(int64_t 
need_free_mem, const std::string& r
 
     auto group_revoke_reason = fmt::format(
             "{}, revoke group id:{}, name:{}, used:{}, limit:{}", 
revoke_reason, _id, _name,
-            MemCounter::print_bytes(used_memory), 
MemCounter::print_bytes(_memory_limit));
+            PrettyPrinter::print_bytes(used_memory), 
PrettyPrinter::print_bytes(_memory_limit));
     LOG(INFO) << fmt::format(
             "[MemoryGC] start WorkloadGroup::revoke_memory, {}, need free 
size: {}.",
-            group_revoke_reason, MemCounter::print_bytes(need_free_mem));
+            group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem));
     Defer defer {[&]() {
         std::stringstream ss;
         group_revoke_profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
                 "[MemoryGC] end WorkloadGroup::revoke_memory, {}, need free 
size: {}, free Memory "
                 "{}. cost(us): {}, details: {}",
-                group_revoke_reason, MemCounter::print_bytes(need_free_mem),
-                MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 
1000, ss.str());
+                group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem),
+                PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 
1000, ss.str());
     }};
 
     // step 1. free top overcommit query
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 8768541b909..5b47535e464 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -699,14 +699,14 @@ int64_t 
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_(
             "[MemoryGC] start 
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, {}, "
             "number of overcommited groups: {}, total exceeded memory: {}.",
             revoke_reason, exceeded_memory_heap.size(),
-            MemCounter::print_bytes(total_exceeded_memory));
+            PrettyPrinter::print_bytes(total_exceeded_memory));
     Defer defer {[&]() {
         std::stringstream ss;
         profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
                 "[MemoryGC] end 
WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, "
                 "{}, number of overcommited groups: {}, free memory {}. 
cost(us): {}, details: {}",
-                revoke_reason, exceeded_memory_heap.size(), 
MemCounter::print_bytes(freed_mem),
+                revoke_reason, exceeded_memory_heap.size(), 
PrettyPrinter::print_bytes(freed_mem),
                 watch.elapsed_time() / 1000, ss.str());
     }};
 
diff --git a/be/src/runtime/workload_management/memory_context.cpp 
b/be/src/runtime/workload_management/memory_context.cpp
index 211d3539a3c..e1198c24f5c 100644
--- a/be/src/runtime/workload_management/memory_context.cpp
+++ b/be/src/runtime/workload_management/memory_context.cpp
@@ -25,9 +25,9 @@ namespace doris {
 std::string MemoryContext::debug_string() {
     return fmt::format("TaskId={}, Memory(Used={}, Limit={}, Peak={})",
                        print_id(resource_ctx_->task_controller()->task_id()),
-                       MemCounter::print_bytes(current_memory_bytes()),
-                       MemCounter::print_bytes(mem_limit()),
-                       MemCounter::print_bytes(peak_memory_bytes()));
+                       PrettyPrinter::print_bytes(current_memory_bytes()),
+                       PrettyPrinter::print_bytes(mem_limit()),
+                       PrettyPrinter::print_bytes(peak_memory_bytes()));
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/runtime/workload_management/task_controller.cpp 
b/be/src/runtime/workload_management/task_controller.cpp
index a025f44fbba..b65c9a57d5b 100644
--- a/be/src/runtime/workload_management/task_controller.cpp
+++ b/be/src/runtime/workload_management/task_controller.cpp
@@ -42,9 +42,9 @@ std::string TaskController::debug_string() {
             "TaskId={}, Memory(Used={}, Limit={}, Peak={}), 
Spill(RunningSpillTaskCnt={}, "
             "TotalPausedPeriodSecs={}, LatestPausedReason={})",
             print_id(task_id_),
-            
MemCounter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()),
-            
MemCounter::print_bytes(resource_ctx_->memory_context()->mem_limit()),
-            
MemCounter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()),
+            
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()),
+            
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->mem_limit()),
+            
PrettyPrinter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()),
             revoking_tasks_count_, memory_sufficient_time() / NANOS_PER_SEC,
             paused_reason_.status().to_string());
 }
diff --git a/be/src/util/pretty_printer.h b/be/src/util/pretty_printer.h
index 5e0d6fdfc9e..69cef37da31 100644
--- a/be/src/util/pretty_printer.h
+++ b/be/src/util/pretty_printer.h
@@ -186,7 +186,8 @@ public:
 
     /// Convenience method
     static std::string print_bytes(int64_t value) {
-        return PrettyPrinter::print(value, TUnit::BYTES);
+        return value >= 0 ? PrettyPrinter::print(value, TUnit::BYTES)
+                          : "-" + PrettyPrinter::print(std::abs(value), 
TUnit::BYTES);
     }
 
 private:
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index c3e9dffaaa6..6ae215e993d 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -46,14 +46,54 @@ std::unordered_map<void*, size_t> 
RecordSizeMemoryAllocator::_allocated_sizes;
 std::mutex RecordSizeMemoryAllocator::_mutex;
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
-void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_memory_check(
-        size_t size) const {
+bool Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_memory_exceed(
+        size_t size, std::string* err_msg) const {
 #ifdef BE_TEST
-    if (!doris::ExecEnv::ready()) {
+    if (!doris::pthread_context_ptr_init) {
+        return false;
+    }
+#endif
+    auto* thread_mem_ctx = 
doris::thread_context()->thread_mem_tracker_mgr.get();
+    if (thread_mem_ctx->skip_memory_check != 0) {
+        return false;
+    }
+
+    if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
+        // Only thread attach task, and has not completely waited for 
thread_wait_gc_max_milliseconds,
+        // will wait for gc. otherwise, if the outside will catch the 
exception, throwing an exception.
+        *err_msg += fmt::format(
+                "Allocator sys memory check failed: Cannot alloc:{}, consuming 
"
+                "tracker:<{}>, peak used {}, current used {}, reserved {}, 
exec node:<{}>, {}.",
+                doris::PrettyPrinter::print_bytes(size),
+                thread_mem_ctx->limiter_mem_tracker()->label(),
+                doris::PrettyPrinter::print_bytes(
+                        
thread_mem_ctx->limiter_mem_tracker()->peak_consumption()),
+                doris::PrettyPrinter::print_bytes(
+                        thread_mem_ctx->limiter_mem_tracker()->consumption()),
+                doris::PrettyPrinter::print_bytes(
+                        
thread_mem_ctx->limiter_mem_tracker()->reserved_consumption()),
+                thread_mem_ctx->last_consumer_tracker_label(),
+                doris::GlobalMemoryArbitrator::process_mem_log_str());
+
+        if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&
+            size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
+            *err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace();
+        }
+        return true;
+    }
+    return false;
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::alloc_fault_probability()
+        const {
+#ifdef BE_TEST
+    if (!doris::pthread_context_ptr_init) {
         return;
     }
 #endif
-    if (doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check != 
0) {
+    auto* thread_mem_ctx = 
doris::thread_context()->thread_mem_tracker_mgr.get();
+    if (thread_mem_ctx->skip_memory_check != 0) {
         return;
     }
 
@@ -65,9 +105,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
             const std::string injection_err_msg = fmt::format(
                     "[MemAllocInjectFault] Task {} alloc memory failed due to 
fault "
                     "injection.",
-                    doris::thread_context()
-                            ->thread_mem_tracker_mgr->limiter_mem_tracker()
-                            ->label());
+                    thread_mem_ctx->limiter_mem_tracker()->label());
             // Print stack trace for debug.
             [[maybe_unused]] auto stack_trace_st =
                     doris::Status::Error<doris::ErrorCode::MEM_ALLOC_FAILED, 
true>(
@@ -80,41 +118,19 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
             }
         }
     }
+}
 
-    if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
-        // Only thread attach task, and has not completely waited for 
thread_wait_gc_max_milliseconds,
-        // will wait for gc. otherwise, if the outside will catch the 
exception, throwing an exception.
-        std::string err_msg;
-        err_msg += fmt::format(
-                "Allocator sys memory check failed: Cannot alloc:{}, consuming 
"
-                "tracker:<{}>, peak used {}, current used {}, reserved {}, 
exec node:<{}>, {}.",
-                doris::PrettyPrinter::print_bytes(size),
-                
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
-                doris::PrettyPrinter::print_bytes(
-                        doris::thread_context()
-                                ->thread_mem_tracker_mgr->limiter_mem_tracker()
-                                ->peak_consumption()),
-                doris::PrettyPrinter::print_bytes(
-                        doris::thread_context()
-                                ->thread_mem_tracker_mgr->limiter_mem_tracker()
-                                ->consumption()),
-                doris::PrettyPrinter::print_bytes(
-                        doris::thread_context()
-                                ->thread_mem_tracker_mgr->limiter_mem_tracker()
-                                ->reserved_consumption()),
-                
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(),
-                doris::GlobalMemoryArbitrator::process_mem_log_str());
-
-        if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&
-            size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
-            err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace();
-        }
-
-        if (doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_memory_check(
+        size_t size) const {
+    std::string err_msg;
+    if (sys_memory_exceed(size, &err_msg)) {
+        auto* thread_mem_ctx = 
doris::thread_context()->thread_mem_tracker_mgr.get();
+        if (thread_mem_ctx->wait_gc()) {
             int64_t wait_milliseconds = 0;
             LOG(INFO) << fmt::format(
                     "Task:{} waiting for enough memory in thread id:{}, 
maximum {}ms, {}.",
-                    
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
+                    thread_mem_ctx->limiter_mem_tracker()->label(),
                     doris::ThreadContext::get_thread_id(),
                     doris::config::thread_wait_gc_max_milliseconds, err_msg);
 
@@ -144,7 +160,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
             // else, enough memory is available, the task continues execute.
             if (wait_milliseconds >= 
doris::config::thread_wait_gc_max_milliseconds) {
                 // Make sure to completely wait 
thread_wait_gc_max_milliseconds only once.
-                
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
+                thread_mem_ctx->disable_wait_gc();
                 
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
 
                 // If the outside will catch the exception, after throwing an 
exception,
@@ -153,62 +169,61 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
                     LOG(INFO) << fmt::format(
                             "Task:{} sys memory check failed, throw exception, 
after waiting for "
                             "memory {}ms, {}.",
-                            doris::thread_context()
-                                    
->thread_mem_tracker_mgr->limiter_mem_tracker()
-                                    ->label(),
-                            wait_milliseconds, err_msg);
+                            thread_mem_ctx->limiter_mem_tracker()->label(), 
wait_milliseconds,
+                            err_msg);
                     throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
                 } else {
                     LOG(INFO) << fmt::format(
                             "Task:{} sys memory check failed, will continue to 
execute, cannot "
                             "throw exception, after waiting for memory {}ms, 
{}.",
-                            doris::thread_context()
-                                    
->thread_mem_tracker_mgr->limiter_mem_tracker()
-                                    ->label(),
-                            wait_milliseconds, err_msg);
+                            thread_mem_ctx->limiter_mem_tracker()->label(), 
wait_milliseconds,
+                            err_msg);
                 }
             }
         } else if (doris::enable_thread_catch_bad_alloc) {
-            LOG(INFO) << fmt::format("sys memory check failed, throw 
exception, {}.", err_msg);
+            LOG(INFO) << fmt::format("{}, throw exception.", err_msg);
             throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
         } else {
-            LOG(INFO) << fmt::format("sys memory check failed, no throw 
exception, {}.", err_msg);
+            LOG(INFO) << fmt::format("{}, no throw exception.", err_msg);
         }
     }
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
-void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_tracker_check(
-        size_t size) const {
+bool Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_tracker_exceed(
+        size_t size, std::string* err_msg) const {
 #ifdef BE_TEST
-    if (!doris::ExecEnv::ready()) {
-        return;
+    if (!doris::pthread_context_ptr_init) {
+        return false;
     }
 #endif
-    if (doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check != 
0) {
-        return;
+    auto* thread_mem_ctx = 
doris::thread_context()->thread_mem_tracker_mgr.get();
+    if (thread_mem_ctx->skip_memory_check != 0) {
+        return false;
     }
-    auto st = 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
-            size);
+
+    auto st = thread_mem_ctx->limiter_mem_tracker()->check_limit(size);
     if (!st) {
-        auto err_msg = fmt::format("Allocator mem tracker check failed, {}", 
st.to_string());
-        
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->print_log_usage(
-                err_msg);
+        *err_msg += fmt::format("Allocator mem tracker check failed, {}", 
st.to_string());
+        thread_mem_ctx->limiter_mem_tracker()->print_log_usage(*err_msg);
+        return true;
+    }
+    return false;
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_tracker_check(
+        size_t size) const {
+    std::string err_msg;
+    if (memory_tracker_exceed(size, &err_msg)) {
         doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
         // If the outside will catch the exception, after throwing an 
exception,
         // the task will actively cancel itself.
         if (doris::enable_thread_catch_bad_alloc) {
-            LOG(INFO) << fmt::format(
-                    "Task:{} memory tracker check failed, throw exception, 
{}.",
-                    
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
-                    err_msg);
+            LOG(INFO) << fmt::format("{}, throw exception.", err_msg);
             throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
         } else {
-            LOG(INFO) << fmt::format(
-                    "Task:{} memory tracker check failed, will continue to 
execute, no throw "
-                    "exception, {}.",
-                    
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
-                    err_msg);
+            LOG(INFO) << fmt::format("{}, will continue to execute, no throw 
exception.", err_msg);
         }
     }
 }
@@ -216,20 +231,27 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_check(
         size_t size) const {
-    sys_memory_check(size);
-    memory_tracker_check(size);
+    if (MemoryAllocator::need_check_and_tracking_memory()) {
+        alloc_fault_probability();
+        sys_memory_check(size);
+        memory_tracker_check(size);
+    }
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::consume_memory(
         size_t size) const {
-    CONSUME_THREAD_MEM_TRACKER(size);
+    if (MemoryAllocator::need_check_and_tracking_memory()) {
+        CONSUME_THREAD_MEM_TRACKER(size);
+    }
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::release_memory(
         size_t size) const {
-    RELEASE_THREAD_MEM_TRACKER(size);
+    if (MemoryAllocator::need_check_and_tracking_memory()) {
+        RELEASE_THREAD_MEM_TRACKER(size);
+    }
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
@@ -246,11 +268,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::throw_b
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::add_address_sanitizers(
         void* buf, size_t size) const {
-#ifdef BE_TEST
-    if (!doris::ExecEnv::ready()) {
-        return;
-    }
-#endif
     if (!doris::config::crash_in_memory_tracker_inaccurate) {
         return;
     }
@@ -261,11 +278,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::add_add
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::remove_address_sanitizers(
         void* buf, size_t size) const {
-#ifdef BE_TEST
-    if (!doris::ExecEnv::ready()) {
-        return;
-    }
-#endif
     if (!doris::config::crash_in_memory_tracker_inaccurate) {
         return;
     }
@@ -277,13 +289,156 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::remove_
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::alloc(size_t size,
                                                                                
 size_t alignment) {
-    return alloc_impl(size, alignment);
+    memory_check(size);
+    // consume memory in tracker before alloc, similar to early declaration.
+    consume_memory(size);
+    void* buf;
+    size_t record_size = size;
+
+    if (use_mmap && size >= doris::config::mmap_threshold) {
+        if (alignment > MMAP_MIN_ALIGNMENT) {
+            throw doris::Exception(
+                    doris::ErrorCode::INVALID_ARGUMENT,
+                    "Too large alignment {}: more than page size when 
allocating {}.", alignment,
+                    size);
+        }
+
+        buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
+        if (MAP_FAILED == buf) {
+            release_memory(size);
+            throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size));
+        }
+        if constexpr (MemoryAllocator::need_record_actual_size()) {
+            record_size = MemoryAllocator::allocated_size(buf);
+        }
+
+        /// No need for zero-fill, because mmap guarantees it.
+    } else {
+        if (alignment <= MALLOC_MIN_ALIGNMENT) {
+            if constexpr (clear_memory) {
+                buf = MemoryAllocator::calloc(size, 1);
+            } else {
+                buf = MemoryAllocator::malloc(size);
+            }
+
+            if (nullptr == buf) {
+                release_memory(size);
+                throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", 
size));
+            }
+            if constexpr (MemoryAllocator::need_record_actual_size()) {
+                record_size = MemoryAllocator::allocated_size(buf);
+            }
+            add_address_sanitizers(buf, record_size);
+        } else {
+            buf = nullptr;
+            int res = MemoryAllocator::posix_memalign(&buf, alignment, size);
+
+            if (0 != res) {
+                release_memory(size);
+                throw_bad_alloc(fmt::format("Cannot allocate memory 
(posix_memalign) {}.", size));
+            }
+
+            if constexpr (clear_memory) {
+                memset(buf, 0, size);
+            }
+
+            if constexpr (MemoryAllocator::need_record_actual_size()) {
+                record_size = MemoryAllocator::allocated_size(buf);
+            }
+            add_address_sanitizers(buf, record_size);
+        }
+    }
+    if constexpr (MemoryAllocator::need_record_actual_size()) {
+        consume_memory(record_size - size);
+    }
+    return buf;
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::free(void* buf,
+                                                                              
size_t size) {
+    if (use_mmap && size >= doris::config::mmap_threshold) {
+        if (0 != munmap(buf, size)) {
+            throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
+        }
+    } else {
+        remove_address_sanitizers(buf, size);
+        MemoryAllocator::free(buf);
+    }
+    release_memory(size);
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::realloc(
         void* buf, size_t old_size, size_t new_size, size_t alignment) {
-    return realloc_impl(buf, old_size, new_size, alignment);
+    if (old_size == new_size) {
+        /// nothing to do.
+        /// BTW, it's not possible to change alignment while doing realloc.
+        return buf;
+    }
+    memory_check(new_size);
+    // Realloc can do 2 possible things:
+    // - expand existing memory region
+    // - allocate new memory block and free the old one
+    // Because we don't know which option will be picked we need to make sure 
there is enough
+    // memory for all options.
+    consume_memory(new_size);
+
+    if (!use_mmap ||
+        (old_size < doris::config::mmap_threshold && new_size < 
doris::config::mmap_threshold &&
+         alignment <= MALLOC_MIN_ALIGNMENT)) {
+        remove_address_sanitizers(buf, old_size);
+        /// Resize malloc'd memory region with no special alignment 
requirement.
+        void* new_buf = MemoryAllocator::realloc(buf, new_size);
+        if (nullptr == new_buf) {
+            release_memory(new_size);
+            throw_bad_alloc(
+                    fmt::format("Allocator: Cannot realloc from {} to {}.", 
old_size, new_size));
+        }
+        // usually, buf addr = new_buf addr, asan maybe not equal.
+        add_address_sanitizers(new_buf, new_size);
+
+        buf = new_buf;
+        release_memory(old_size);
+        if constexpr (clear_memory) {
+            if (new_size > old_size) {
+                memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - 
old_size);
+            }
+        }
+    } else if (old_size >= doris::config::mmap_threshold &&
+               new_size >= doris::config::mmap_threshold) {
+        /// Resize mmap'd memory region.
+        // On apple and freebsd self-implemented mremap used (common/mremap.h)
+        buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, 
PROT_READ | PROT_WRITE,
+                                mmap_flags, -1, 0);
+        if (MAP_FAILED == buf) {
+            release_memory(new_size);
+            throw_bad_alloc(fmt::format("Allocator: Cannot mremap memory chunk 
from {} to {}.",
+                                        old_size, new_size));
+        }
+        release_memory(old_size);
+
+        /// No need for zero-fill, because mmap guarantees it.
+
+        if constexpr (mmap_populate) {
+            // MAP_POPULATE seems have no effect for mremap as for mmap,
+            // Clear enlarged memory range explicitly to pre-fault the pages
+            if (new_size > old_size) {
+                memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - 
old_size);
+            }
+        }
+    } else {
+        // Big allocs that requires a copy.
+        void* new_buf = alloc(new_size, alignment);
+        memcpy(new_buf, buf, std::min(old_size, new_size));
+        add_address_sanitizers(new_buf, new_size);
+        remove_address_sanitizers(buf, old_size);
+        free(buf, old_size);
+        buf = new_buf;
+        release_memory(old_size);
+    }
+
+    return buf;
 }
 
 template class Allocator<true, true, true, DefaultMemoryAllocator>;
@@ -295,6 +450,15 @@ template class Allocator<false, true, false, 
DefaultMemoryAllocator>;
 template class Allocator<false, false, true, DefaultMemoryAllocator>;
 template class Allocator<false, false, false, DefaultMemoryAllocator>;
 
+template class Allocator<true, true, true, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<true, true, false, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<true, false, true, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<true, false, false, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<false, true, true, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<false, true, false, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<false, false, true, NoTrackingDefaultMemoryAllocator>;
+template class Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>;
+
 /** It would be better to put these Memory Allocators where they are used, 
such as in the orc memory pool and arrow memory pool.
   * But currently allocators use templates in .cpp instead of all in .h, so 
they can only be placed here.
   */
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 881e05c3117..ab3e699c617 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -112,6 +112,13 @@ public:
                   nullptr, 0);
 #endif // defined(USE_JEMALLOC)
     }
+
+    static constexpr bool need_check_and_tracking_memory() { return true; }
+};
+
+class NoTrackingDefaultMemoryAllocator : public DefaultMemoryAllocator {
+public:
+    static constexpr bool need_check_and_tracking_memory() { return false; }
 };
 
 /** It would be better to put these Memory Allocators where they are used, 
such as in the orc memory pool and arrow memory pool.
@@ -139,6 +146,8 @@ public:
     static void free(void* p) __THROW { std::free(p); }
 
     static void release_unused() {}
+
+    static constexpr bool need_check_and_tracking_memory() { return true; }
 };
 
 class RecordSizeMemoryAllocator {
@@ -208,6 +217,8 @@ public:
 
     static void release_unused() {}
 
+    static constexpr bool need_check_and_tracking_memory() { return true; }
+
 private:
     static std::unordered_map<void*, size_t> _allocated_sizes;
     static std::mutex _mutex;
@@ -226,11 +237,41 @@ private:
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
 class Allocator {
 public:
+    // Allocate memory range.
+    void* alloc(size_t size, size_t alignment = 0);
+
+    /** Enlarge memory range.
+      * Data from old range is moved to the beginning of new range.
+      * Address of memory range could change.
+      */
+    void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0);
+
+    // Free memory range.
+    void free(void* buf, size_t size);
+
+    void release_unused() { MemoryAllocator::release_unused(); }
+
+    bool sys_memory_exceed(size_t size, std::string* err_msg) const;
+
+    bool memory_tracker_exceed(size_t size, std::string* err_msg) const;
+
+    static constexpr bool need_check_and_tracking_memory() {
+        return MemoryAllocator::need_check_and_tracking_memory();
+    }
+
+protected:
+    static constexpr size_t get_stack_threshold() { return 0; }
+
+    static constexpr bool clear_memory = clear_memory_;
+
+private:
     void sys_memory_check(size_t size) const;
     void memory_tracker_check(size_t size) const;
     // If sys memory or tracker exceeds the limit, but there is no external 
catch bad_alloc,
     // alloc will continue to execute, so the consume memtracker is forced.
     void memory_check(size_t size) const;
+    void alloc_fault_probability() const;
+
     // Increases consumption of this tracker by 'bytes'.
     // some special cases:
     // 1. objects that inherit Allocator will not be shared by multiple 
queries.
@@ -245,157 +286,6 @@ public:
     void add_address_sanitizers(void* buf, size_t size) const;
     void remove_address_sanitizers(void* buf, size_t size) const;
 
-    void* alloc(size_t size, size_t alignment = 0);
-    void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0);
-
-    /// Allocate memory range.
-    void* alloc_impl(size_t size, size_t alignment = 0) {
-        memory_check(size);
-        // consume memory in tracker before alloc, similar to early 
declaration.
-        consume_memory(size);
-        void* buf;
-        size_t record_size = size;
-
-        if (use_mmap && size >= doris::config::mmap_threshold) {
-            if (alignment > MMAP_MIN_ALIGNMENT)
-                throw doris::Exception(
-                        doris::ErrorCode::INVALID_ARGUMENT,
-                        "Too large alignment {}: more than page size when 
allocating {}.",
-                        alignment, size);
-
-            buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, mmap_flags, -1, 
0);
-            if (MAP_FAILED == buf) {
-                release_memory(size);
-                throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", 
size));
-            }
-            if constexpr (MemoryAllocator::need_record_actual_size()) {
-                record_size = MemoryAllocator::allocated_size(buf);
-            }
-
-            /// No need for zero-fill, because mmap guarantees it.
-        } else {
-            if (alignment <= MALLOC_MIN_ALIGNMENT) {
-                if constexpr (clear_memory)
-                    buf = MemoryAllocator::calloc(size, 1);
-                else
-                    buf = MemoryAllocator::malloc(size);
-
-                if (nullptr == buf) {
-                    release_memory(size);
-                    throw_bad_alloc(fmt::format("Allocator: Cannot malloc 
{}.", size));
-                }
-                if constexpr (MemoryAllocator::need_record_actual_size()) {
-                    record_size = MemoryAllocator::allocated_size(buf);
-                }
-                add_address_sanitizers(buf, record_size);
-            } else {
-                buf = nullptr;
-                int res = MemoryAllocator::posix_memalign(&buf, alignment, 
size);
-
-                if (0 != res) {
-                    release_memory(size);
-                    throw_bad_alloc(
-                            fmt::format("Cannot allocate memory 
(posix_memalign) {}.", size));
-                }
-
-                if constexpr (clear_memory) memset(buf, 0, size);
-
-                if constexpr (MemoryAllocator::need_record_actual_size()) {
-                    record_size = MemoryAllocator::allocated_size(buf);
-                }
-                add_address_sanitizers(buf, record_size);
-            }
-        }
-        if constexpr (MemoryAllocator::need_record_actual_size()) {
-            consume_memory(record_size - size);
-        }
-        return buf;
-    }
-
-    /// Free memory range.
-    void free(void* buf, size_t size) {
-        if (use_mmap && size >= doris::config::mmap_threshold) {
-            if (0 != munmap(buf, size)) {
-                throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", 
size));
-            }
-        } else {
-            remove_address_sanitizers(buf, size);
-            MemoryAllocator::free(buf);
-        }
-        release_memory(size);
-    }
-
-    void release_unused() { MemoryAllocator::release_unused(); }
-
-    /** Enlarge memory range.
-      * Data from old range is moved to the beginning of new range.
-      * Address of memory range could change.
-      */
-    void* realloc_impl(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0) {
-        if (old_size == new_size) {
-            /// nothing to do.
-            /// BTW, it's not possible to change alignment while doing realloc.
-            return buf;
-        }
-        memory_check(new_size);
-        consume_memory(new_size - old_size);
-
-        if (!use_mmap ||
-            (old_size < doris::config::mmap_threshold && new_size < 
doris::config::mmap_threshold &&
-             alignment <= MALLOC_MIN_ALIGNMENT)) {
-            remove_address_sanitizers(buf, old_size);
-            /// Resize malloc'd memory region with no special alignment 
requirement.
-            void* new_buf = MemoryAllocator::realloc(buf, new_size);
-            if (nullptr == new_buf) {
-                release_memory(new_size - old_size);
-                throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} 
to {}.", old_size,
-                                            new_size));
-            }
-            // usually, buf addr = new_buf addr, asan maybe not equal.
-            add_address_sanitizers(new_buf, new_size);
-
-            buf = new_buf;
-            if constexpr (clear_memory)
-                if (new_size > old_size)
-                    memset(reinterpret_cast<char*>(buf) + old_size, 0, 
new_size - old_size);
-        } else if (old_size >= doris::config::mmap_threshold &&
-                   new_size >= doris::config::mmap_threshold) {
-            /// Resize mmap'd memory region.
-            // On apple and freebsd self-implemented mremap used 
(common/mremap.h)
-            buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, 
PROT_READ | PROT_WRITE,
-                                    mmap_flags, -1, 0);
-            if (MAP_FAILED == buf) {
-                release_memory(new_size - old_size);
-                throw_bad_alloc(fmt::format("Allocator: Cannot mremap memory 
chunk from {} to {}.",
-                                            old_size, new_size));
-            }
-
-            /// No need for zero-fill, because mmap guarantees it.
-
-            if constexpr (mmap_populate) {
-                // MAP_POPULATE seems have no effect for mremap as for mmap,
-                // Clear enlarged memory range explicitly to pre-fault the 
pages
-                if (new_size > old_size)
-                    memset(reinterpret_cast<char*>(buf) + old_size, 0, 
new_size - old_size);
-            }
-        } else {
-            // Big allocs that requires a copy.
-            void* new_buf = alloc(new_size, alignment);
-            memcpy(new_buf, buf, std::min(old_size, new_size));
-            add_address_sanitizers(new_buf, new_size);
-            remove_address_sanitizers(buf, old_size);
-            free(buf, old_size);
-            buf = new_buf;
-        }
-
-        return buf;
-    }
-
-protected:
-    static constexpr size_t get_stack_threshold() { return 0; }
-
-    static constexpr bool clear_memory = clear_memory_;
-
     // Freshly mmapped pages are copy-on-write references to a global zero 
page.
     // On the first write, a page fault occurs, and an actual writable page is
     // allocated. If we are going to use this memory soon, such as when 
resizing
@@ -452,6 +342,18 @@ public:
         return new_buf;
     }
 
+    bool sys_memory_exceed(size_t size, std::string* err_msg) const {
+        return Base::sys_memory_exceed(size, err_msg);
+    }
+
+    bool memory_tracker_exceed(size_t size, std::string* err_msg) const {
+        return Base::memory_tracker_exceed(size, err_msg);
+    }
+
+    static constexpr bool need_check_and_tracking_memory() {
+        return Base::need_check_and_tracking_memory();
+    }
+
 protected:
     static constexpr size_t get_stack_threshold() { return N; }
 };
diff --git a/be/src/vec/common/allocator_fwd.h 
b/be/src/vec/common/allocator_fwd.h
index b20e88aa6fc..42f039b3b2a 100644
--- a/be/src/vec/common/allocator_fwd.h
+++ b/be/src/vec/common/allocator_fwd.h
@@ -26,6 +26,8 @@
 #include <cstddef>
 namespace doris {
 class DefaultMemoryAllocator;
+class NoTrackingDefaultMemoryAllocator;
+
 template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = 
false,
           typename MemoryAllocator = DefaultMemoryAllocator>
 class Allocator;
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index a8abce8385f..98823fffd42 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -33,6 +33,7 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "runtime/thread_context.h"
 #include "vec/common/allocator.h" // IWYU pragma: keep
 #include "vec/common/memcpy_small.h"
 
@@ -82,6 +83,16 @@ inline size_t round_up_to_power_of_two_or_zero(size_t n) {
   *  and zero initialize -1th element. It allows to use -1th element that will 
have value 0.
   * This gives performance benefits when converting an array of offsets to 
array of sizes.
   *
+  * If reserve 4096 bytes, used 512 bytes, pad_left = 16, pad_right = 15, the 
structure of PODArray is as follows:
+  *
+  *         16 bytes          512 bytes                 3553 bytes             
               15 bytes
+  * pad_left ----- c_start -------------c_end ---------------------------- 
c_end_of_storage ------------- pad_right
+  *    ^                                        ^                              
                                ^
+  *    |                                        |                              
                                |
+  *    |                                    c_res_mem (usually > c_end && < 
c_end + PRE_GROWTH_SIZE)           |
+  *    |                                                                       
                                |
+  *    +-------------------------------------- allocated_bytes (4096 bytes) 
-----------------------------------+
+  *
   * Some methods using allocator have TAllocatorParams variadic arguments.
   * These arguments will be passed to corresponding methods of TAllocator.
   * Example: pointer to Arena, that is used for allocations.
@@ -98,6 +109,7 @@ inline size_t round_up_to_power_of_two_or_zero(size_t n) {
   * TODO Allow greater alignment than alignof(T). Example: array of char 
aligned to page size.
   */
 static constexpr size_t EmptyPODArraySize = 1024;
+static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M
 extern const char empty_pod_array[EmptyPODArraySize];
 
 /** Base class that depend only on size of element, not on element itself.
@@ -122,6 +134,7 @@ protected:
     char* c_start = null; /// Does not include pad_left.
     char* c_end = null;
     char* c_end_of_storage = null; /// Does not include pad_right.
+    char* c_res_mem = null;
 
     /// The amount of memory occupied by the num_elements of the elements.
     static size_t byte_size(size_t num_elements) {
@@ -143,6 +156,42 @@ protected:
         return byte_size(num_elements) + pad_right + pad_left;
     }
 
+    inline void check_memory(int64_t size) {
+        std::string err_msg;
+        if (TAllocator::sys_memory_exceed(size, &err_msg) ||
+            TAllocator::memory_tracker_exceed(size, &err_msg)) {
+            err_msg = fmt::format("PODArray reserve memory failed, {}.", 
err_msg);
+            if (doris::enable_thread_catch_bad_alloc) {
+                LOG(WARNING) << err_msg;
+                throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, 
err_msg);
+            } else {
+                LOG_EVERY_N(WARNING, 1024) << err_msg;
+            }
+        }
+    }
+
+    inline void reset_resident_memory(const char* c_end_new) {
+        static_assert(!TAllocator::need_check_and_tracking_memory(),
+                      "TAllocator should specify 
`NoTrackingDefaultMemoryAllocator`");
+        if (UNLIKELY(c_end_new - c_res_mem > 0)) {
+            // - allocated_bytes = c_end_of_storage - c_start = 4 MB;
+            // - used_bytes = c_end_new - c_start = 2.1 MB;
+            // - last tracking_res_memory = c_res_mem - c_start = 1 MB;
+            // - res_mem_growth = min(allocated_bytes, 
integerRoundUp(used_bytes)) - last_tracking_res_memory = 3 - 1 = 2 MB;
+            // - update tracking_res_memory = 1 + 2 = 3 MB;
+            // so after each reset_resident_memory, tracking_res_memory >= 
used_bytes;
+            int64_t res_mem_growth =
+                    std::min(static_cast<size_t>(c_end_of_storage - c_start),
+                             integerRoundUp(c_end_new - c_start, 
PRE_GROWTH_SIZE)) -
+                    (c_res_mem - c_start);
+            check_memory(res_mem_growth);
+            CONSUME_THREAD_MEM_TRACKER(res_mem_growth);
+            c_res_mem = c_res_mem + res_mem_growth;
+        }
+    }
+
+    inline void reset_resident_memory() { reset_resident_memory(c_end); }
+
     void alloc_for_num_elements(size_t num_elements) {
         
alloc(round_up_to_power_of_two_or_zero(minimum_memory_for_elements(num_elements)));
     }
@@ -154,6 +203,7 @@ protected:
 
         c_start = allocated + pad_left;
         c_end = c_start;
+        c_res_mem = c_start;
         c_end_of_storage = allocated + bytes - pad_right;
 
         if (pad_left) memset(c_start - ELEMENT_SIZE, 0, ELEMENT_SIZE);
@@ -161,9 +211,8 @@ protected:
 
     void dealloc() {
         if (c_start == null) return;
-
         unprotect();
-
+        RELEASE_THREAD_MEM_TRACKER((c_res_mem - c_start));
         TAllocator::free(c_start - pad_left, allocated_bytes());
     }
 
@@ -177,13 +226,21 @@ protected:
         unprotect();
 
         ptrdiff_t end_diff = c_end - c_start;
-
+        ptrdiff_t res_mem_diff = c_res_mem - c_start;
+
+        // Realloc can do 2 possible things:
+        // - expand existing memory region
+        // - allocate new memory block and free the old one
+        // Because we don't know which option will be picked we need to make 
sure there is enough
+        // memory for all options.
+        check_memory(res_mem_diff);
         char* allocated = reinterpret_cast<char*>(
                 TAllocator::realloc(c_start - pad_left, allocated_bytes(), 
bytes,
                                     
std::forward<TAllocatorParams>(allocator_params)...));
 
         c_start = allocated + pad_left;
         c_end = c_start + end_diff;
+        c_res_mem = c_start + res_mem_diff;
         c_end_of_storage = allocated + bytes - pad_right;
     }
 
@@ -259,7 +316,10 @@ public:
         resize_assume_reserved(n);
     }
 
-    void resize_assume_reserved(const size_t n) { c_end = c_start + 
byte_size(n); }
+    void resize_assume_reserved(const size_t n) {
+        c_end = c_start + byte_size(n);
+        reset_resident_memory();
+    }
 
     const char* raw_data() const { return c_start; }
 
@@ -268,6 +328,7 @@ public:
         if (UNLIKELY(c_end == c_end_of_storage))
             
reserve_for_next_size(std::forward<TAllocatorParams>(allocator_params)...);
 
+        reset_resident_memory(c_end + byte_size(1));
         memcpy(c_end, ptr, ELEMENT_SIZE);
         c_end += byte_size(1);
     }
@@ -328,6 +389,7 @@ public:
     PODArray(size_t n) {
         this->alloc_for_num_elements(n);
         this->c_end += this->byte_size(n);
+        this->reset_resident_memory();
     }
 
     PODArray(size_t n, const T& x) {
@@ -379,25 +441,36 @@ public:
     const_iterator cend() const { return t_end(); }
 
     void* get_end_ptr() const { return this->c_end; }
-    void set_end_ptr(void* ptr) { this->c_end = (char*)ptr; }
+    void set_end_ptr(void* ptr) {
+        this->c_end = (char*)ptr;
+        this->reset_resident_memory();
+    }
 
     /// Same as resize, but zeroes new elements.
     void resize_fill(size_t n) {
         size_t old_size = this->size();
+        const auto new_size = this->byte_size(n);
         if (n > old_size) {
             this->reserve(n);
+            this->reset_resident_memory(this->c_start + new_size);
             memset(this->c_end, 0, this->byte_size(n - old_size));
+        } else {
+            this->reset_resident_memory(this->c_start + new_size);
         }
-        this->c_end = this->c_start + this->byte_size(n);
+        this->c_end = this->c_start + new_size;
     }
 
     void resize_fill(size_t n, const T& value) {
         size_t old_size = this->size();
+        const auto new_size = this->byte_size(n);
         if (n > old_size) {
             this->reserve(n);
+            this->reset_resident_memory(this->c_start + new_size);
             std::fill(t_end(), t_end() + n - old_size, value);
+        } else {
+            this->reset_resident_memory(this->c_start + new_size);
         }
-        this->c_end = this->c_start + this->byte_size(n);
+        this->c_end = this->c_start + new_size;
     }
 
     template <typename U, typename... TAllocatorParams>
@@ -406,6 +479,7 @@ public:
             
this->reserve_for_next_size(std::forward<TAllocatorParams>(allocator_params)...);
         }
 
+        this->reset_resident_memory(this->c_end + this->byte_size(1));
         new (t_end()) T(std::forward<U>(x));
         this->c_end += this->byte_size(1);
     }
@@ -417,6 +491,7 @@ public:
             if (UNLIKELY(this->c_end + growth_size > this->c_end_of_storage)) {
                 this->reserve(this->size() + num);
             }
+            this->reset_resident_memory(this->c_end + growth_size);
             std::fill(t_end(), t_end() + num, x);
             this->c_end = this->c_end + growth_size;
         }
@@ -425,8 +500,10 @@ public:
     template <typename U, typename... TAllocatorParams>
     void add_num_element_without_reserve(U&& x, uint32_t num,
                                          TAllocatorParams&&... 
allocator_params) {
+        const auto growth_size = sizeof(T) * num;
+        this->reset_resident_memory(this->c_end + growth_size);
         std::fill(t_end(), t_end() + num, x);
-        this->c_end += sizeof(T) * num;
+        this->c_end += growth_size;
     }
 
     /**
@@ -435,6 +512,7 @@ public:
      */
     template <typename U, typename... TAllocatorParams>
     void push_back_without_reserve(U&& x, TAllocatorParams&&... 
allocator_params) {
+        this->reset_resident_memory(this->c_end + this->byte_size(1));
         new (t_end()) T(std::forward<U>(x));
         this->c_end += this->byte_size(1);
     }
@@ -448,6 +526,7 @@ public:
             this->reserve_for_next_size();
         }
 
+        this->reset_resident_memory(this->c_end + this->byte_size(1));
         new (t_end()) T(std::forward<Args>(args)...);
         this->c_end += this->byte_size(1);
     }
@@ -479,6 +558,7 @@ public:
         static_assert(pad_right_ >= 15);
         insert_prepare(from_begin, from_end, 
std::forward<TAllocatorParams>(allocator_params)...);
         size_t bytes_to_copy = this->byte_size(from_end - from_begin);
+        this->reset_resident_memory(this->c_end + bytes_to_copy);
         memcpy_small_allow_read_write_overflow15(
                 this->c_end, reinterpret_cast<const void*>(&*from_begin), 
bytes_to_copy);
         this->c_end += bytes_to_copy;
@@ -492,6 +572,7 @@ public:
         }
         size_t bytes_to_move = this->byte_size(end() - it);
         insert_prepare(from_begin, from_end);
+        this->reset_resident_memory(this->c_end + bytes_to_copy);
 
         if (UNLIKELY(bytes_to_move)) {
             memmove(this->c_end + bytes_to_copy - bytes_to_move, this->c_end - 
bytes_to_move,
@@ -507,6 +588,7 @@ public:
     void insert_assume_reserved(It1 from_begin, It2 from_end) {
         this->assert_not_intersects(from_begin, from_end);
         size_t bytes_to_copy = this->byte_size(from_end - from_begin);
+        this->reset_resident_memory(this->c_end + bytes_to_copy);
         memcpy(this->c_end, reinterpret_cast<const void*>(&*from_begin), 
bytes_to_copy);
         this->c_end += bytes_to_copy;
     }
@@ -514,6 +596,7 @@ public:
     template <typename It1, typename It2>
     void insert_assume_reserved_and_allow_overflow(It1 from_begin, It2 
from_end) {
         size_t bytes_to_copy = this->byte_size(from_end - from_begin);
+        this->reset_resident_memory(this->c_end + bytes_to_copy);
         memcpy_small_allow_read_write_overflow15(
                 this->c_end, reinterpret_cast<const void*>(&*from_begin), 
bytes_to_copy);
         this->c_end += bytes_to_copy;
@@ -534,9 +617,11 @@ public:
         auto swap_stack_heap = [this](PODArray& arr1, PODArray& arr2) {
             size_t stack_size = arr1.size();
             size_t stack_allocated = arr1.allocated_bytes();
+            size_t stack_res_mem_used = arr1.c_res_mem - arr1.c_start;
 
             size_t heap_size = arr2.size();
             size_t heap_allocated = arr2.allocated_bytes();
+            size_t heap_res_mem_used = arr2.c_res_mem - arr2.c_start;
 
             /// Keep track of the stack content we have to copy.
             char* stack_c_start = arr1.c_start;
@@ -545,12 +630,14 @@ public:
             arr1.c_start = arr2.c_start;
             arr1.c_end_of_storage = arr1.c_start + heap_allocated - 
arr2.pad_right - arr2.pad_left;
             arr1.c_end = arr1.c_start + this->byte_size(heap_size);
+            arr1.c_res_mem = arr1.c_start + heap_res_mem_used;
 
             /// Allocate stack space for arr2.
             arr2.alloc(stack_allocated);
             /// Copy the stack content.
             memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size));
             arr2.c_end = arr2.c_start + this->byte_size(stack_size);
+            arr2.c_res_mem = arr2.c_start + stack_res_mem_used;
         };
 
         auto do_move = [this](PODArray& src, PODArray& dest) {
@@ -559,14 +646,17 @@ public:
                 dest.alloc(src.allocated_bytes());
                 memcpy(dest.c_start, src.c_start, this->byte_size(src.size()));
                 dest.c_end = dest.c_start + this->byte_size(src.size());
+                dest.c_res_mem = dest.c_start + (src.c_res_mem - src.c_start);
 
                 src.c_start = Base::null;
                 src.c_end = Base::null;
                 src.c_end_of_storage = Base::null;
+                src.c_res_mem = Base::null;
             } else {
                 std::swap(dest.c_start, src.c_start);
                 std::swap(dest.c_end, src.c_end);
                 std::swap(dest.c_end_of_storage, src.c_end_of_storage);
+                std::swap(dest.c_res_mem, src.c_res_mem);
             }
         };
 
@@ -594,9 +684,11 @@ public:
 
             size_t lhs_size = this->size();
             size_t lhs_allocated = this->allocated_bytes();
+            size_t lhs_res_mem_used = this->c_res_mem - this->c_start;
 
             size_t rhs_size = rhs.size();
             size_t rhs_allocated = rhs.allocated_bytes();
+            size_t rhs_res_mem_used = rhs.c_res_mem - rhs.c_start;
 
             this->c_end_of_storage =
                     this->c_start + rhs_allocated - Base::pad_right - 
Base::pad_left;
@@ -604,6 +696,9 @@ public:
 
             this->c_end = this->c_start + this->byte_size(rhs_size);
             rhs.c_end = rhs.c_start + this->byte_size(lhs_size);
+
+            this->c_res_mem = this->c_start + rhs_res_mem_used;
+            rhs.c_res_mem = rhs.c_start + lhs_res_mem_used;
         } else if (this->is_allocated_from_stack() && 
!rhs.is_allocated_from_stack()) {
             swap_stack_heap(*this, rhs);
         } else if (!this->is_allocated_from_stack() && 
rhs.is_allocated_from_stack()) {
@@ -612,6 +707,7 @@ public:
             std::swap(this->c_start, rhs.c_start);
             std::swap(this->c_end, rhs.c_end);
             std::swap(this->c_end_of_storage, rhs.c_end_of_storage);
+            std::swap(this->c_res_mem, rhs.c_res_mem);
         }
     }
 
@@ -628,6 +724,7 @@ public:
             this->reserve(round_up_to_power_of_two_or_zero(required_capacity));
 
         size_t bytes_to_copy = this->byte_size(required_capacity);
+        this->reset_resident_memory(this->c_start + bytes_to_copy);
         memcpy(this->c_start, reinterpret_cast<const void*>(&*from_begin), 
bytes_to_copy);
         this->c_end = this->c_start + bytes_to_copy;
     }
diff --git a/be/src/vec/common/pod_array_fwd.h 
b/be/src/vec/common/pod_array_fwd.h
index bd0c7e272e4..cd1ce16ab9e 100644
--- a/be/src/vec/common/pod_array_fwd.h
+++ b/be/src/vec/common/pod_array_fwd.h
@@ -32,7 +32,8 @@ inline constexpr size_t integerRoundUp(size_t value, size_t 
dividend) {
     return ((value + dividend - 1) / dividend) * dividend;
 }
 
-template <typename T, size_t initial_bytes = 4096, typename TAllocator = 
Allocator<false>,
+template <typename T, size_t initial_bytes = 4096,
+          typename TAllocator = Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>,
           size_t pad_right_ = 0, size_t pad_left_ = 0>
 class PODArray;
 
@@ -40,7 +41,8 @@ class PODArray;
   * TODO, Adapt internal data structures to 512-bit era 
https://github.com/ClickHouse/ClickHouse/pull/42564
   *       Padding in internal data structures increased to 64 bytes., support 
AVX-512 simd.
   */
-template <typename T, size_t initial_bytes = 4096, typename TAllocator = 
Allocator<false>>
+template <typename T, size_t initial_bytes = 4096,
+          typename TAllocator = Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>>
 using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 16, 15>;
 
 /** A helper for declaring PODArray that uses inline memory.
@@ -49,8 +51,9 @@ using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 
16, 15>;
   */
 template <typename T, size_t inline_bytes,
           size_t rounded_bytes = integerRoundUp(inline_bytes, sizeof(T))>
-using PODArrayWithStackMemory =
-        PODArray<T, rounded_bytes,
-                 AllocatorWithStackMemory<Allocator<false>, rounded_bytes, 
alignof(T)>>;
+using PODArrayWithStackMemory = PODArray<
+        T, rounded_bytes,
+        AllocatorWithStackMemory<Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>,
+                                 rounded_bytes, alignof(T)>>;
 
 } // namespace doris::vectorized
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp 
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
index 84d0d617a96..3a69418971a 100644
--- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -341,7 +341,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
     int64_t size3 = size2 * 2;
 
     thread_context->attach_task(rc);
-    auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3, 
false);
+    auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3);
     EXPECT_TRUE(st.ok()) << st.to_string();
     EXPECT_EQ(t->consumption(), size3);
     EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
diff --git a/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h 
b/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h
index 14c68c5be9c..ae7f856441c 100644
--- a/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h
+++ b/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h
@@ -23,7 +23,10 @@ namespace doris {
 class MockThreadMemTrackerMgr : public ThreadMemTrackerMgr {
 public:
     MockThreadMemTrackerMgr() : ThreadMemTrackerMgr() {}
-    Status try_reserve(int64_t size, bool only_check_process_memory = false) 
override {
+    Status try_reserve(
+            int64_t size,
+            TryReserveChecker checker =
+                    
TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS) override {
         return _test_low_memory ? 
Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>("")
                                 : Status::OK();
     }
diff --git a/be/test/vec/common/pod_array_test.cpp 
b/be/test/vec/common/pod_array_test.cpp
index db022a12729..8139b1e9937 100644
--- a/be/test/vec/common/pod_array_test.cpp
+++ b/be/test/vec/common/pod_array_test.cpp
@@ -28,8 +28,11 @@ namespace doris {
 
 TEST(PODArrayTest, PODArrayBasicMove) {
     static constexpr size_t initial_bytes = 32;
-    using Array = vectorized::PODArray<uint64_t, initial_bytes,
-                                       
AllocatorWithStackMemory<Allocator<false>, initial_bytes>>;
+    using Array = vectorized::PODArray<
+            uint64_t, initial_bytes,
+            AllocatorWithStackMemory<
+                    Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>,
+                    initial_bytes>>;
 
     {
         Array arr;
@@ -141,8 +144,11 @@ TEST(PODArrayTest, PODArrayBasicMove) {
 
 TEST(PODArrayTest, PODArrayBasicSwap) {
     static constexpr size_t initial_bytes = 32;
-    using Array = vectorized::PODArray<uint64_t, initial_bytes,
-                                       
AllocatorWithStackMemory<Allocator<false>, initial_bytes>>;
+    using Array = vectorized::PODArray<
+            uint64_t, initial_bytes,
+            AllocatorWithStackMemory<
+                    Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>,
+                    initial_bytes>>;
 
     {
         Array arr;
@@ -383,8 +389,11 @@ TEST(PODArrayTest, PODArrayBasicSwap) {
 
 TEST(PODArrayTest, PODArrayBasicSwapMoveConstructor) {
     static constexpr size_t initial_bytes = 32;
-    using Array = vectorized::PODArray<uint64_t, initial_bytes,
-                                       
AllocatorWithStackMemory<Allocator<false>, initial_bytes>>;
+    using Array = vectorized::PODArray<
+            uint64_t, initial_bytes,
+            AllocatorWithStackMemory<
+                    Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>,
+                    initial_bytes>>;
 
     {
         Array arr;
@@ -652,4 +661,274 @@ TEST(PODArrayTest, PODErase) {
     }
 }
 
+TEST(PODArrayTest, PaddedPODArrayTrackingMemory) {
+    auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"UT");
+    //  PaddedPODArray
+    {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t);
+        EXPECT_EQ(t->consumption(), 0);
+        static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M
+
+        vectorized::PaddedPODArray<uint64_t, 4096> array;
+        size_t pad_right = vectorized::integerRoundUp(16, sizeof(uint64_t));
+        size_t pad_left =
+                vectorized::integerRoundUp(vectorized::integerRoundUp(15, 
sizeof(uint64_t)), 16);
+        EXPECT_EQ(t->consumption(), 0);
+
+        array.push_back(1);
+        EXPECT_EQ(array.size(), 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.push_back(2);
+        EXPECT_EQ(array.size(), 2);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t));
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t));
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE);
+
+        array.push_back(3);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) + 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        // c_end_of_storage - c_start < integerRoundUp(c_end_new - c_start, 
PRE_GROWTH_SIZE)
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.assign({1, 2, 3});
+        EXPECT_EQ(array.size(), 3);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.resize((PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3);
+
+        array.add_num_element_without_reserve(1, 10);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3 + 10);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.add_num_element(1, (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 2);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 5 + 10);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6);
+
+        array.push_back_without_reserve(11);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 5 + 11);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6);
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 6, 2);
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 6);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6);
+
+        array.push_back_without_reserve(22);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7);
+
+        array.emplace_back(33);
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 2);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7);
+
+        array.pop_back();
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7);
+
+        for (int i = 0; i < (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 2; i++) {
+            array.push_back(2);
+            array.pop_back();
+            array.pop_back();
+        }
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 4 + 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7);
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 7, 3);
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 7);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7);
+
+        {
+            vectorized::PaddedPODArray<uint64_t, 32> array2;
+            array2.push_back(3);
+            array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 3);
+            array.insert(array2.begin(), array2.end());
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 9);
+        }
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 8);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 8);
+
+        {
+            vectorized::PaddedPODArray<uint64_t, 32> array2;
+            array2.resize_fill((PRE_GROWTH_SIZE / sizeof(uint64_t)) * 7, 3);
+            array.insert_small_allow_read_write_overflow15(array2.begin(), 
array2.end());
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 22);
+        }
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 15);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 15);
+
+        {
+            vectorized::PaddedPODArray<uint64_t, 32> array2;
+            array2.push_back(3);
+            array.insert(array2.begin(), array2.end());
+        }
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 15 + 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - 
pad_right);
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 16, 4);
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 16);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 16);
+
+        {
+            vectorized::PaddedPODArray<uint64_t, 32> array2;
+            array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 3);
+            array.insert(array.begin(), array2.begin(), array2.end());
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 18);
+        }
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 17);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 17);
+
+        {
+            vectorized::PaddedPODArray<uint64_t, 32> array2;
+            array2.resize_fill((PRE_GROWTH_SIZE / sizeof(uint64_t) * 2), 3);
+            array.insert_assume_reserved(array2.begin(), array2.end());
+            array.insert_assume_reserved_and_allow_overflow(array2.begin(), 
array2.end());
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 23);
+        }
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 21);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21);
+
+        size_t n = 100;
+        array.assign(n, (uint64_t)0);
+        EXPECT_EQ(array.size(), 100);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21);
+
+        array.add_num_element_without_reserve(1, PRE_GROWTH_SIZE / 
sizeof(uint64_t));
+        EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 1 + 
100);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21);
+
+        array.erase(array.begin() + 100, array.end());
+        EXPECT_EQ(array.size(), 100);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21);
+    }
+    EXPECT_EQ(t->consumption(), 0);
+}
+
+TEST(PODArrayTest, PODArrayTrackingMemory) {
+    auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"UT");
+
+    // PODArray with AllocatorWithStackMemory
+    {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t);
+        EXPECT_EQ(t->consumption(), 0);
+        static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M
+
+        static constexpr size_t initial_bytes = 32;
+        using Array = vectorized::PODArray<
+                uint64_t, initial_bytes,
+                AllocatorWithStackMemory<
+                        Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>>>;
+        Array array;
+
+        array.push_back(1);
+        EXPECT_EQ(array.size(), 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes());
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 1);
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t));
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE);
+
+        // test swap
+        size_t new_capacity = 0;
+        size_t new_size = 0;
+        {
+            Array array2;
+            array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 2, 1);
+            EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2);
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3);
+            new_capacity = array2.capacity();
+            new_size = array2.size();
+
+            array.swap(array2);
+            EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t));
+            EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2);
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3);
+            EXPECT_EQ(array.capacity(), new_capacity);
+            EXPECT_EQ(array.size(), new_size);
+        }
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 2);
+    }
+    EXPECT_EQ(t->consumption(), 0);
+
+    // PODArray with Allocator
+    {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t);
+        EXPECT_EQ(t->consumption(), 0);
+        static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M
+
+        static constexpr size_t initial_bytes = 32;
+        using Array = vectorized::PODArray<
+                uint64_t, initial_bytes,
+                Allocator<false, false, false, 
NoTrackingDefaultMemoryAllocator>>;
+        Array array;
+
+        array.push_back(1);
+        EXPECT_EQ(array.size(), 1);
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), array.allocated_bytes());
+
+        array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 1);
+        EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t));
+        doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE);
+
+        // test swap
+        size_t new_capacity = 0;
+        size_t new_size = 0;
+        {
+            Array array2;
+            array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 2, 1);
+            EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2);
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3);
+            new_capacity = array2.capacity();
+            new_size = array2.size();
+
+            array.swap(array2);
+            EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t));
+            EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2);
+            
doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem();
+            EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3);
+            EXPECT_EQ(array.capacity(), new_capacity);
+            EXPECT_EQ(array.size(), new_size);
+        }
+        EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 2);
+    }
+    EXPECT_EQ(t->consumption(), 0);
+}
+
 } // end namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to