This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch revert-12168-20220829_fix_podarray_resetpeak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e7231d49e5a335a5ba5b838e6ea3349cf83db103
Author: yiguolei <676222...@qq.com>
AuthorDate: Tue Aug 30 14:57:36 2022 +0800

    Revert "[enhancement](memtracker) Improve performance of tracking real 
physical memory of PODArray #12168"
    
    This reverts commit 8370115cf61ad78e2ee76af53484085a12f676af.
---
 be/src/exec/hash_table.cpp                       |  2 +-
 be/src/exec/partitioned_aggregation_node.cc      |  6 +++---
 be/src/exec/partitioned_hash_table.cc            |  2 +-
 be/src/runtime/buffered_block_mgr2.cc            | 14 +++++++-------
 be/src/runtime/exec_env.h                        |  3 ---
 be/src/runtime/exec_env_init.cpp                 |  1 -
 be/src/runtime/mem_pool.cpp                      |  8 ++++----
 be/src/runtime/mem_pool.h                        |  4 ++--
 be/src/runtime/memory/mem_tracker.cpp            | 15 +++++++++------
 be/src/runtime/memory/mem_tracker_limiter.cpp    |  2 +-
 be/src/runtime/memory/mem_tracker_limiter.h      |  2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp |  6 ++----
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 11 ++++-------
 be/src/runtime/runtime_state.cpp                 | 13 +++++--------
 be/src/runtime/thread_context.h                  | 12 ++++++------
 be/src/service/doris_main.cpp                    |  2 +-
 be/src/vec/common/pod_array.h                    | 10 +++++-----
 17 files changed, 52 insertions(+), 61 deletions(-)

diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp
index 2aa195bebc..b50b03460e 100644
--- a/be/src/exec/hash_table.cpp
+++ b/be/src/exec/hash_table.cpp
@@ -175,7 +175,7 @@ Status HashTable::resize_buckets(int64_t num_buckets) {
 
     int64_t old_num_buckets = _num_buckets;
     int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
-    Status st = 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
+    Status st = 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
             delta_bytes);
     if (!st) {
         LOG_EVERY_N(WARNING, 100) << "resize bucket failed: " << 
st.to_string();
diff --git a/be/src/exec/partitioned_aggregation_node.cc 
b/be/src/exec/partitioned_aggregation_node.cc
index 624f7c71ce..16332151b0 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -911,13 +911,13 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
             << "Backend: " << BackendOptions::get_localhost() << ", "
             << "fragment: " << print_id(state_->fragment_instance_id()) << " "
             << "Used: "
-            << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->consumption()
+            << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->consumption()
             << ", Limit: "
-            << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->limit() 
<< ". "
+            << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->limit() << ". 
"
             << "You can change the limit by session variable exec_mem_limit.";
         string details = Substitute(str.str(), _id, tuple_data_size);
         *status = thread_context()
-                          ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()
+                          ->_thread_mem_tracker_mgr->limiter_mem_tracker()
                           ->mem_limit_exceeded(state_, details, 
tuple_data_size);
         return nullptr;
     }
diff --git a/be/src/exec/partitioned_hash_table.cc 
b/be/src/exec/partitioned_hash_table.cc
index a662764907..b78622e137 100644
--- a/be/src/exec/partitioned_hash_table.cc
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -307,7 +307,7 @@ Status 
PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
                                      MAX_EXPR_VALUES_ARRAY_SIZE / 
expr_values_bytes_per_row_));
 
     int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, 
num_exprs_);
-    if 
(UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
+    if 
(UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
                 mem_usage))) {
         capacity_ = 0;
         string details = Substitute(
diff --git a/be/src/runtime/buffered_block_mgr2.cc 
b/be/src/runtime/buffered_block_mgr2.cc
index fa2d1b70b0..e610cf3803 100644
--- a/be/src/runtime/buffered_block_mgr2.cc
+++ b/be/src/runtime/buffered_block_mgr2.cc
@@ -251,7 +251,7 @@ int64_t BufferedBlockMgr2::remaining_unreserved_buffers() 
const {
     int64_t num_buffers =
             _free_io_buffers.size() + _unpinned_blocks.size() + 
_non_local_outstanding_writes;
     num_buffers +=
-            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->spare_capacity()
 /
+            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity()
 /
             max_block_size();
     num_buffers -= _unfullfilled_reserved_buffers;
     return num_buffers;
@@ -358,9 +358,9 @@ Status BufferedBlockMgr2::get_new_block(Client* client, 
Block* unpin_block, Bloc
 
         if (len > 0 && len < _max_block_size) {
             DCHECK(unpin_block == nullptr);
-            Status st = thread_context()
-                                
->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()
-                                ->check_limit(len);
+            Status st =
+                    
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
+                            len);
             WARN_IF_ERROR(st, "get_new_block failed");
             if (st) {
                 client->_tracker->consume(len);
@@ -986,7 +986,7 @@ Status BufferedBlockMgr2::find_buffer(unique_lock<mutex>& 
lock, BufferDescriptor
 
     // First, try to allocate a new buffer.
     if (_free_io_buffers.size() < _block_write_threshold &&
-        
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
+        
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
                 _max_block_size)) {
         _mem_tracker->consume(_max_block_size);
         uint8_t* new_buffer = new uint8_t[_max_block_size];
@@ -1155,9 +1155,9 @@ string BufferedBlockMgr2::debug_internal() const {
        << "  Unfullfilled reserved buffers: " << 
_unfullfilled_reserved_buffers << endl
        << "  BUffer Block Mgr Used memory: " << _mem_tracker->consumption()
        << "  Instance remaining memory: "
-       << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->spare_capacity()
+       << 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity()
        << " (#blocks="
-       << 
(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->spare_capacity()
 /
+       << 
(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->spare_capacity()
 /
            _max_block_size)
        << ")" << endl
        << "  Block write threshold: " << _block_write_threshold;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index a86889f35d..cd32b9beff 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -114,10 +114,8 @@ public:
     }
 
     std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return 
_process_mem_tracker; }
-    MemTrackerLimiter* process_mem_tracker_raw() { return 
_process_mem_tracker_raw; }
     void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
tracker) {
         _process_mem_tracker = tracker;
-        _process_mem_tracker_raw = tracker.get();
     }
     std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return 
_query_pool_mem_tracker; }
     std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return 
_load_pool_mem_tracker; }
@@ -196,7 +194,6 @@ private:
     // The ancestor for all trackers. Every tracker is visible from the 
process down.
     // Not limit total memory by process tracker, and it's just used to track 
virtual memory of process.
     std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
-    MemTrackerLimiter* _process_mem_tracker_raw;
     // The ancestor for all querys tracker.
     std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
     // The ancestor for all load tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0aedb39b49..0fab11f3ee 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -205,7 +205,6 @@ Status ExecEnv::_init_mem_tracker() {
     }
     _process_mem_tracker =
             std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, 
"Process");
-    _process_mem_tracker_raw = _process_mem_tracker.get();
     thread_context()->_thread_mem_tracker_mgr->init();
     thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 768cdccb18..6fae53bba4 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -67,7 +67,7 @@ MemPool::~MemPool() {
         ChunkAllocator::instance()->free(chunk.chunk);
     }
     THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                     
ExecEnv::GetInstance()->process_mem_tracker().get());
     if (_mem_tracker) _mem_tracker->release(total_bytes_released);
     
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
 }
@@ -89,7 +89,7 @@ void MemPool::free_all() {
         ChunkAllocator::instance()->free(chunk.chunk);
     }
     THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - 
peak_allocated_bytes_,
-                                     
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                     
ExecEnv::GetInstance()->process_mem_tracker().get());
     if (_mem_tracker) _mem_tracker->release(total_bytes_released);
     chunks_.clear();
     next_chunk_size_ = INITIAL_CHUNK_SIZE;
@@ -141,7 +141,7 @@ Status MemPool::find_chunk(size_t min_size, bool 
check_limits) {
 
     chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
     if (check_limits &&
-        
!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->check_limit(
+        
!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
                 chunk_size)) {
         return Status::MemoryAllocFailed("MemPool find new chunk {} bytes 
faild, exceed limit",
                                          chunk_size);
@@ -150,7 +150,7 @@ Status MemPool::find_chunk(size_t min_size, bool 
check_limits) {
     // Allocate a new chunk. Return early if allocate fails.
     Chunk chunk;
     RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk));
-    THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, 
ExecEnv::GetInstance()->process_mem_tracker_raw());
+    THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, 
ExecEnv::GetInstance()->process_mem_tracker().get());
     if (_mem_tracker) _mem_tracker->consume(chunk_size);
     ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
     // Put it before the first free chunk. If no free chunks, it goes at the 
end.
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 98e2d5a271..5ad56ef934 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -212,9 +212,9 @@ private:
     bool check_integrity(bool check_current_chunk_empty);
 
     void reset_peak() {
-        if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
+        if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) {
             THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - 
peak_allocated_bytes_,
-                                             
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                             
ExecEnv::GetInstance()->process_mem_tracker().get());
             peak_allocated_bytes_ = total_allocated_bytes_;
         }
     }
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index b282b87758..5ef28ee7da 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -58,12 +58,15 @@ MemTracker::MemTracker(const std::string& label, 
RuntimeProfile* profile) {
         _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, 
TUnit::BYTES);
     }
 
-    DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != 
nullptr);
-    _label = fmt::format(
-            "{} | {}", label,
-            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label());
-    _bind_group_num =
-            
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num();
+    if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
+        _label = fmt::format(
+                "{} | {}", label,
+                
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
+    } else {
+        _label = label + " | ";
+    }
+
+    _bind_group_num = 
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
     {
         std::lock_guard<std::mutex> 
l(mem_tracker_pool[_bind_group_num].group_lock);
         _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 7597ecfb39..13e888959a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -303,7 +303,7 @@ Status MemTrackerLimiter::mem_limit_exceeded(const 
std::string& msg,
         // The limit of the current tracker and parents is less than 0, the 
consume will not fail,
         // and the current process memory has no excess limit.
         detail += fmt::format("unknown exceed reason, executing_msg:<{}>", 
msg);
-        print_log_usage_tracker = 
ExecEnv::GetInstance()->process_mem_tracker_raw();
+        print_log_usage_tracker = 
ExecEnv::GetInstance()->process_mem_tracker().get();
     }
     auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
     if (print_log_usage_tracker != nullptr)
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 9cbc688b71..65506964a8 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -73,7 +73,7 @@ public:
             auto st = Status::MemoryLimitExceeded(
                     "process memory used {} exceed limit {}, 
failed_alloc_size={}",
                     PerfCounters::get_vm_rss(), MemInfo::mem_limit(), bytes);
-            
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
+            
ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(st.get_error_msg());
             return st;
         }
         return Status::OK();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 5cbc76eadc..025a8248d4 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -32,7 +32,6 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
     _limiter_tracker = mem_tracker;
-    _limiter_tracker_raw = mem_tracker.get();
 }
 
 void ThreadMemTrackerMgr::detach_limiter_tracker() {
@@ -40,7 +39,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
-    _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
 }
 
 void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& 
cancel_details) {
@@ -56,9 +54,9 @@ void ThreadMemTrackerMgr::exceeded(Status 
failed_try_consume_st) {
         _cb_func();
     }
     if (is_attach_query()) {
-        auto st = _limiter_tracker_raw->mem_limit_exceeded(
+        auto st = _limiter_tracker->mem_limit_exceeded(
                 fmt::format("exec node:<{}>", last_consumer_tracker()),
-                _limiter_tracker_raw->parent().get(), failed_try_consume_st);
+                _limiter_tracker->parent().get(), failed_try_consume_st);
         exceeded_cancel_task(st.get_error_msg());
         _check_limit = false; // Make sure it will only be canceled once
     }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 13ecea46b4..5e02e2cd77 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -50,7 +50,7 @@ public:
 
     // only for tcmalloc hook
     static void consume_no_attach(int64_t size) {
-        ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size);
+        ExecEnv::GetInstance()->process_mem_tracker()->consume(size);
     }
 
     // After thread initialization, calling `init` again must call 
`clear_untracked_mems` first
@@ -84,7 +84,6 @@ public:
     bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return 
_limiter_tracker; }
-    MemTrackerLimiter* limiter_mem_tracker_raw() { return 
_limiter_tracker_raw; }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -114,7 +113,6 @@ private:
     int64_t _untracked_mem = 0;
 
     std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
-    MemTrackerLimiter* _limiter_tracker_raw;
     std::vector<MemTracker*> _consumer_tracker_stack;
 
     // If true, call memtracker try_consume, otherwise call consume.
@@ -131,7 +129,6 @@ inline void ThreadMemTrackerMgr::init() {
     DCHECK(_consumer_tracker_stack.empty());
     _task_id = "";
     _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
-    _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
     _check_limit = true;
 }
 
@@ -182,15 +179,15 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
         // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
         //        _limiter_tracker->label() != "Process");
 #endif
-        Status st = _limiter_tracker_raw->try_consume(_untracked_mem);
+        Status st = _limiter_tracker->try_consume(_untracked_mem);
         if (!st) {
             // The memory has been allocated, so when TryConsume fails, need 
to continue to complete
             // the consume to ensure the accuracy of the statistics.
-            _limiter_tracker_raw->consume(_untracked_mem);
+            _limiter_tracker->consume(_untracked_mem);
             exceeded(st);
         }
     } else {
-        _limiter_tracker_raw->consume(_untracked_mem);
+        _limiter_tracker->consume(_untracked_mem);
     }
     for (auto tracker : _consumer_tracker_stack) {
         tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b07a8cda87..e8a5e2846c 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -203,14 +203,13 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
 Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     bool has_query_mem_tracker = _query_options.__isset.mem_limit && 
(_query_options.mem_limit > 0);
     int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : 
-1;
-    if (bytes_limit > 
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) {
+    if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) {
         VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
                     << " exceeds process memory limit of "
-                    << PrettyPrinter::print(
-                               
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(),
-                               TUnit::BYTES)
+                    << 
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
+                                            TUnit::BYTES)
                     << ". Using process memory limit instead";
-        bytes_limit = 
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit();
+        bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit();
     }
     auto mem_tracker_counter = ADD_COUNTER(&_profile, "MemoryLimit", 
TUnit::BYTES);
     mem_tracker_counter->set(bytes_limit);
@@ -300,9 +299,7 @@ Status RuntimeState::set_mem_limit_exceeded(const 
std::string& msg) {
 Status RuntimeState::check_query_state(const std::string& msg) {
     // TODO: it would be nice if this also checked for cancellation, but doing 
so breaks
     // cases where we use Status::Cancelled("Cancelled") to indicate that the 
limit was reached.
-    if (thread_context()
-                ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()
-                ->any_limit_exceeded()) {
+    if 
(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded())
 {
         RETURN_LIMIT_EXCEEDED(this, msg);
     }
     return query_status();
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index b097910f24..1fe2f82530 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -136,7 +136,7 @@ public:
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
         DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && 
_task_id == "")
                 << ",new tracker label: " << mem_tracker->label() << ",old 
tracker label: "
-                << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
+                << _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
         DCHECK(type != TaskType::UNKNOWN);
         _type = type;
         _task_id = task_id;
@@ -256,15 +256,15 @@ public:
     doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
 #define RELEASE_THREAD_MEM_TRACKER(size) \
     doris::thread_context()->_thread_mem_tracker_mgr->consume(-size)
-#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
               \
-    
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to(
 \
-            size, tracker)
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
                \
+    
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size,
 \
+                                                                               
          tracker)
 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
     tracker->transfer_to(                               \
-            size, 
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
+            size, 
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get())
 #define RETURN_LIMIT_EXCEEDED(state, msg, ...)                                 
             \
     return doris::thread_context()                                             
             \
-            ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()               
             \
+            ->_thread_mem_tracker_mgr->limiter_mem_tracker()                   
             \
             ->mem_limit_exceeded(                                              
             \
                     state,                                                     
             \
                     fmt::format("exec node:<{}>, {}",                          
             \
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 680778989c..f6fa80e52c 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -502,7 +502,7 @@ int main(int argc, char** argv) {
         // this will cause coredump for ASAN build when running regression 
test,
         // disable temporarily.
         
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
-        
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
+        
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
         sleep(1);
     }
 
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 2f842040a6..84ced225e1 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -113,9 +113,9 @@ protected:
     }
 
     inline void reset_peak() {
-        if (UNLIKELY(c_end - c_end_peak > 65536)) {
+        if (UNLIKELY(c_end - c_end_peak > 4096)) {
             THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
-                                             
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                             
ExecEnv::GetInstance()->process_mem_tracker().get());
             c_end_peak = c_end;
         }
     }
@@ -127,7 +127,7 @@ protected:
     template <typename... TAllocatorParams>
     void alloc(size_t bytes, TAllocatorParams&&... allocator_params) {
         THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left,
-                                       
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                       
ExecEnv::GetInstance()->process_mem_tracker().get());
         c_start = c_end = c_end_peak =
                 reinterpret_cast<char*>(TAllocator::alloc(
                         bytes, 
std::forward<TAllocatorParams>(allocator_params)...)) +
@@ -144,7 +144,7 @@ protected:
 
         TAllocator::free(c_start - pad_left, allocated_bytes());
         THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak,
-                                         
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                         
ExecEnv::GetInstance()->process_mem_tracker().get());
     }
 
     template <typename... TAllocatorParams>
@@ -157,7 +157,7 @@ protected:
         unprotect();
 
         THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(),
-                                       
ExecEnv::GetInstance()->process_mem_tracker_raw());
+                                       
ExecEnv::GetInstance()->process_mem_tracker().get());
 
         ptrdiff_t end_diff = c_end - c_start;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to