This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d8f5901d fix mem tracker limiter (#11376)
73d8f5901d is described below

commit 73d8f5901d09eece433dbd3d366fd2719d9d3ee7
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Aug 1 09:44:04 2022 +0800

    fix mem tracker limiter (#11376)
---
 be/src/olap/base_compaction.cpp                  |  2 +-
 be/src/olap/compaction.cpp                       |  4 +-
 be/src/olap/compaction.h                         |  2 +-
 be/src/olap/cumulative_compaction.cpp            |  2 +-
 be/src/olap/delta_writer.cpp                     | 20 ++++-----
 be/src/olap/delta_writer.h                       | 12 +++---
 be/src/olap/memtable_flush_executor.cpp          |  7 ++--
 be/src/olap/memtable_flush_executor.h            |  3 +-
 be/src/olap/olap_server.cpp                      |  4 +-
 be/src/olap/storage_engine.cpp                   | 16 ++++----
 be/src/olap/storage_engine.h                     | 26 +++++++-----
 be/src/olap/task/engine_alter_tablet_task.cpp    |  4 +-
 be/src/olap/task/engine_alter_tablet_task.h      |  2 +-
 be/src/olap/task/engine_batch_load_task.cpp      |  4 +-
 be/src/olap/task/engine_batch_load_task.h        |  2 +-
 be/src/olap/task/engine_checksum_task.cpp        |  4 +-
 be/src/olap/task/engine_checksum_task.h          |  2 +-
 be/src/olap/task/engine_clone_task.cpp           |  4 +-
 be/src/olap/task/engine_clone_task.h             |  2 +-
 be/src/runtime/data_stream_mgr.cpp               |  6 +--
 be/src/runtime/data_stream_recvr.cc              |  5 +--
 be/src/runtime/data_stream_recvr.h               |  5 +--
 be/src/runtime/exec_env.h                        | 16 ++++----
 be/src/runtime/exec_env_init.cpp                 | 12 +++---
 be/src/runtime/load_channel.cpp                  |  6 +--
 be/src/runtime/load_channel.h                    |  4 +-
 be/src/runtime/load_channel_mgr.cpp              | 10 ++---
 be/src/runtime/load_channel_mgr.h                |  2 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp    | 52 ++++++++++++------------
 be/src/runtime/memory/mem_tracker_limiter.h      | 18 ++++----
 be/src/runtime/memory/mem_tracker_task_pool.cpp  | 46 +++++++++------------
 be/src/runtime/memory/mem_tracker_task_pool.h    | 26 ++++++------
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 11 +++--
 be/src/runtime/memory/thread_mem_tracker_mgr.h   |  6 +--
 be/src/runtime/runtime_state.cpp                 |  5 ++-
 be/src/runtime/runtime_state.h                   |  8 ++--
 be/src/runtime/sorted_run_merger.cc              |  2 +-
 be/src/runtime/tablets_channel.cpp               |  7 ++--
 be/src/runtime/tablets_channel.h                 |  7 ++--
 be/src/runtime/thread_context.cpp                |  5 ++-
 be/src/runtime/thread_context.h                  |  5 ++-
 be/src/service/internal_service.cpp              | 16 ++++----
 be/src/vec/runtime/vdata_stream_mgr.cpp          |  4 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp        |  5 +--
 be/src/vec/runtime/vdata_stream_recvr.h          |  6 +--
 be/test/runtime/mem_limit_test.cpp               | 12 +++---
 be/test/testutil/run_all_tests.cpp               |  3 +-
 47 files changed, 220 insertions(+), 212 deletions(-)

diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 307fa2fee0..2e8d1f82aa 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -63,7 +63,7 @@ Status BaseCompaction::execute_compact_impl() {
         return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::COMPACTION);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
 
     // 2. do base compaction, merge rowsets
     int64_t permits = get_compaction_permits();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9a0e50d591..7ac55ab15e 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -34,10 +34,10 @@ Compaction::Compaction(TabletSharedPtr tablet, const 
std::string& label)
           _input_row_num(0),
           _state(CompactionState::INITED) {
 #ifndef BE_TEST
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1, label, StorageEngine::instance()->compaction_mem_tracker());
 #else
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(-1, label);
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label);
 #endif
 }
 
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 7d7a43bfd3..d73bfa9dba 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -76,7 +76,7 @@ protected:
 
 protected:
     // the root tracker for this compaction
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
     TabletSharedPtr _tablet;
 
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 9fc9fd8b62..ba52fbfdcb 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() {
         return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::COMPACTION);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
 
     // 3. do cumulative compaction, merge rowsets
     int64_t permits = get_compaction_permits();
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index e50a8567b4..60a096af28 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -30,14 +30,14 @@
 
 namespace doris {
 
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, 
MemTrackerLimiter* parent_tracker,
-                         bool is_vec) {
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
+                         const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker, bool is_vec) {
     *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, 
is_vec);
     return Status::OK();
 }
 
 DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
-                         MemTrackerLimiter* parent_tracker, bool is_vec)
+                         const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker, bool is_vec)
         : _req(*req),
           _tablet(nullptr),
           _cur_rowset(nullptr),
@@ -98,9 +98,9 @@ Status DeltaWriter::init() {
                      << ", schema_hash=" << _req.schema_hash;
         return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
     }
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), 
_parent_tracker);
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
     // check tablet version number
     if (_tablet->version_count() > config::max_tablet_version_num) {
         //trigger quick compaction
@@ -208,7 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, 
const std::vector<int>
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
     _mem_table->insert(block, row_idxs);
 
     if (_mem_table->need_to_agg()) {
@@ -226,7 +226,7 @@ Status DeltaWriter::_flush_memtable_async() {
     if (++_segment_counter > config::max_segment_num_per_rowset) {
         return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
     }
-    return _flush_token->submit(std::move(_mem_table), _mem_tracker.get());
+    return _flush_token->submit(std::move(_mem_table), _mem_tracker);
 }
 
 Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
@@ -243,7 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) 
{
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
     if (mem_consumption() == _mem_table->memory_usage()) {
         // equal means there is no memtable in flush queue, just flush this 
memtable
         VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable 
size: "
@@ -297,7 +297,7 @@ Status DeltaWriter::close() {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
     RETURN_NOT_OK(_flush_memtable_async());
     _mem_table.reset();
     return Status::OK();
@@ -312,7 +312,7 @@ Status DeltaWriter::close_wait() {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
     // return error if previous flush failed
     RETURN_NOT_OK(_flush_token->wait());
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 1ce62de338..573b786402 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -56,7 +56,9 @@ struct WriteRequest {
 class DeltaWriter {
 public:
     static Status open(WriteRequest* req, DeltaWriter** writer,
-                       MemTrackerLimiter* parent_tracker = nullptr, bool 
is_vec = false);
+                       const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker =
+                               std::shared_ptr<MemTrackerLimiter>(),
+                       bool is_vec = false);
 
     ~DeltaWriter();
 
@@ -101,8 +103,8 @@ public:
     int64_t get_mem_consumption_snapshot() const;
 
 private:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
MemTrackerLimiter* parent_tracker,
-                bool is_vec);
+    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
+                const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool 
is_vec);
 
     // push a full memtable to flush executor
     Status _flush_memtable_async();
@@ -133,8 +135,8 @@ private:
 
     StorageEngine* _storage_engine;
     std::unique_ptr<FlushToken> _flush_token;
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
-    MemTrackerLimiter* _parent_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _parent_tracker;
 
     // The counter of number of segment flushed already.
     int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index b9e622a9a7..53fc5ac2f7 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -29,7 +29,7 @@ namespace doris {
 class MemtableFlushTask final : public Runnable {
 public:
     MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> 
memtable,
-                      int64_t submit_task_time, MemTrackerLimiter* tracker)
+                      int64_t submit_task_time, const 
std::shared_ptr<MemTrackerLimiter>& tracker)
             : _flush_token(flush_token),
               _memtable(std::move(memtable)),
               _submit_task_time(submit_task_time),
@@ -47,7 +47,7 @@ private:
     FlushToken* _flush_token;
     std::unique_ptr<MemTable> _memtable;
     int64_t _submit_task_time;
-    MemTrackerLimiter* _tracker;
+    std::shared_ptr<MemTrackerLimiter> _tracker;
 };
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
@@ -58,7 +58,8 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat) {
     return os;
 }
 
-Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, 
MemTrackerLimiter* tracker) {
+Status FlushToken::submit(std::unique_ptr<MemTable> mem_table,
+                          const std::shared_ptr<MemTrackerLimiter>& tracker) {
     ErrorCode s = _flush_status.load();
     if (s != OLAP_SUCCESS) {
         return Status::OLAPInternalError(s);
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 6f986af3f5..4003126d58 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -57,7 +57,8 @@ public:
     explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
             : _flush_token(std::move(flush_pool_token)), 
_flush_status(OLAP_SUCCESS) {}
 
-    Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* 
tracker);
+    Status submit(std::unique_ptr<MemTable> mem_table,
+                  const std::shared_ptr<MemTrackerLimiter>& tracker);
 
     // error has happpens, so we cancel this token
     // And remove all tasks in the queue.
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ef53eeaee4..cb080a0e70 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -112,7 +112,7 @@ Status StorageEngine::start_bg_threads() {
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_scan_thread",
                     [this, data_dir]() {
-                        SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::STORAGE);
+                        SCOPED_ATTACH_TASK(_mem_tracker, 
ThreadContext::TaskType::STORAGE);
                         this->_path_scan_thread_callback(data_dir);
                     },
                     &path_scan_thread));
@@ -122,7 +122,7 @@ Status StorageEngine::start_bg_threads() {
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_gc_thread",
                     [this, data_dir]() {
-                        SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::STORAGE);
+                        SCOPED_ATTACH_TASK(_mem_tracker, 
ThreadContext::TaskType::STORAGE);
                         this->_path_gc_thread_callback(data_dir);
                     },
                     &path_gc_thread));
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 4e28ee7d8b..f239f2a585 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -113,16 +113,16 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _index_stream_lru_cache(nullptr),
           _file_cache(nullptr),
           _compaction_mem_tracker(
-                  std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::AutoCompaction")),
+                  std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::AutoCompaction")),
           
_segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")),
           _schema_change_mem_tracker(
-                  std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::SchemaChange")),
-          _clone_mem_tracker(std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::Clone")),
+                  std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::SchemaChange")),
+          _clone_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::Clone")),
           _batch_load_mem_tracker(
-                  std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::BatchLoad")),
+                  std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::BatchLoad")),
           _consistency_mem_tracker(
-                  std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::Consistency")),
-          _mem_tracker(std::make_unique<MemTrackerLimiter>(-1, 
"StorageEngine::Self")),
+                  std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::Consistency")),
+          _mem_tracker(std::make_shared<MemTrackerLimiter>(-1, 
"StorageEngine::Self")),
           _stop_background_threads_latch(1),
           _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
           _txn_manager(new TxnManager(config::txn_map_shard_size, 
config::txn_shard_size)),
@@ -168,7 +168,7 @@ void StorageEngine::load_data_dirs(const 
std::vector<DataDir*>& data_dirs) {
     std::vector<std::thread> threads;
     for (auto data_dir : data_dirs) {
         threads.emplace_back([this, data_dir] {
-            SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::STORAGE);
+            SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
             auto res = data_dir->load();
             if (!res.ok()) {
                 LOG(WARNING) << "io error when init load tables. res=" << res
@@ -220,7 +220,7 @@ Status StorageEngine::_init_store_map() {
                                      _tablet_manager.get(), 
_txn_manager.get());
         tmp_stores.emplace_back(store);
         threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
-            SCOPED_ATTACH_TASK(_mem_tracker.get(), 
ThreadContext::TaskType::STORAGE);
+            SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
             auto st = store->init();
             if (!st.ok()) {
                 {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index d508a617c5..a46218e442 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -179,12 +179,16 @@ public:
 
     Status get_compaction_status_json(std::string* result);
 
-    MemTrackerLimiter* compaction_mem_tracker() { return 
_compaction_mem_tracker.get(); }
+    std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return 
_compaction_mem_tracker; }
     MemTracker* segment_meta_mem_tracker() { return 
_segment_meta_mem_tracker.get(); }
-    MemTrackerLimiter* schema_change_mem_tracker() { return 
_schema_change_mem_tracker.get(); }
-    MemTrackerLimiter* clone_mem_tracker() { return _clone_mem_tracker.get(); }
-    MemTrackerLimiter* batch_load_mem_tracker() { return 
_batch_load_mem_tracker.get(); }
-    MemTrackerLimiter* consistency_mem_tracker() { return 
_consistency_mem_tracker.get(); }
+    std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() {
+        return _schema_change_mem_tracker;
+    }
+    std::shared_ptr<MemTrackerLimiter> clone_mem_tracker() { return 
_clone_mem_tracker; }
+    std::shared_ptr<MemTrackerLimiter> batch_load_mem_tracker() { return 
_batch_load_mem_tracker; }
+    std::shared_ptr<MemTrackerLimiter> consistency_mem_tracker() {
+        return _consistency_mem_tracker;
+    }
 
     // check cumulative compaction config
     void check_cumulative_compaction_config();
@@ -333,21 +337,21 @@ private:
     std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
 
     // Count the memory consumption of all Base and Cumulative tasks.
-    std::unique_ptr<MemTrackerLimiter> _compaction_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker;
     // This mem tracker is only for tracking memory use by segment meta data 
such as footer or index page.
     // The memory consumed by querying is tracked in segment iterator.
     std::unique_ptr<MemTracker> _segment_meta_mem_tracker;
     // Count the memory consumption of all SchemaChange tasks.
-    std::unique_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
     // Count the memory consumption of all EngineCloneTask.
     // Note: Memory that does not contain make/release snapshots.
-    std::unique_ptr<MemTrackerLimiter> _clone_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _clone_mem_tracker;
     // Count the memory consumption of all EngineBatchLoadTask.
-    std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
     // Count the memory consumption of all EngineChecksumTask.
-    std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _consistency_mem_tracker;
     // StorageEngine oneself
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _unused_rowset_monitor_thread;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp 
b/be/src/olap/task/engine_alter_tablet_task.cpp
index ce34cac4d3..7689df96b3 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -25,7 +25,7 @@ namespace doris {
 
 EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
         : _alter_tablet_req(request) {
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             config::memory_limitation_per_thread_for_schema_change_bytes,
             fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
                         std::to_string(_alter_tablet_req.base_tablet_id),
@@ -34,7 +34,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const 
TAlterTabletReqV2& request)
 }
 
 Status EngineAlterTabletTask::execute() {
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
     DorisMetrics::instance()->create_rollup_requests_total->increment(1);
 
     Status res = 
SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
diff --git a/be/src/olap/task/engine_alter_tablet_task.h 
b/be/src/olap/task/engine_alter_tablet_task.h
index b6a736b357..1054c55eec 100644
--- a/be/src/olap/task/engine_alter_tablet_task.h
+++ b/be/src/olap/task/engine_alter_tablet_task.h
@@ -36,7 +36,7 @@ public:
 private:
     const TAlterTabletReqV2& _alter_tablet_req;
 
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 }; // EngineTask
 
 } // namespace doris
diff --git a/be/src/olap/task/engine_batch_load_task.cpp 
b/be/src/olap/task/engine_batch_load_task.cpp
index e98ad02471..c478adcd8c 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -53,7 +53,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, 
std::vector<TTablet
           _signature(signature),
           _res_status(res_status) {
     _download_status = Status::OK();
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1,
             fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", 
_push_req.push_type,
                         std::to_string(_push_req.tablet_id)),
@@ -63,7 +63,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, 
std::vector<TTablet
 EngineBatchLoadTask::~EngineBatchLoadTask() {}
 
 Status EngineBatchLoadTask::execute() {
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
     Status status = Status::OK();
     if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == 
TPushType::LOAD_V2) {
         status = _init();
diff --git a/be/src/olap/task/engine_batch_load_task.h 
b/be/src/olap/task/engine_batch_load_task.h
index f0bf0dba0d..e2aba71f00 100644
--- a/be/src/olap/task/engine_batch_load_task.h
+++ b/be/src/olap/task/engine_batch_load_task.h
@@ -76,7 +76,7 @@ private:
     Status* _res_status;
     std::string _remote_file_path;
     std::string _local_file_path;
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 }; // class EngineBatchLoadTask
 } // namespace doris
 #endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index f5d5f317f1..fc17c2fad3 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -26,13 +26,13 @@ namespace doris {
 EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash 
schema_hash,
                                        TVersion version, uint32_t* checksum)
         : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), 
_checksum(checksum) {
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
             StorageEngine::instance()->consistency_mem_tracker());
 }
 
 Status EngineChecksumTask::execute() {
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
     return _compute_checksum();
 } // execute
 
diff --git a/be/src/olap/task/engine_checksum_task.h 
b/be/src/olap/task/engine_checksum_task.h
index 233c6e84f2..04afa1a5cd 100644
--- a/be/src/olap/task/engine_checksum_task.h
+++ b/be/src/olap/task/engine_checksum_task.h
@@ -44,7 +44,7 @@ private:
     TSchemaHash _schema_hash;
     TVersion _version;
     uint32_t* _checksum;
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 }; // EngineTask
 
 } // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 73466cb72e..429a3d34be 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -57,14 +57,14 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& 
clone_req, const TMasterInfo&
           _res_status(res_status),
           _signature(signature),
           _master_info(master_info) {
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1, "EngineCloneTask#tabletId=" + 
std::to_string(_clone_req.tablet_id),
             StorageEngine::instance()->clone_mem_tracker());
 }
 
 Status EngineCloneTask::execute() {
     // register the tablet to avoid it is deleted by gc thread during clone 
process
-    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
     
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
     Status st = _do_clone();
     
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
diff --git a/be/src/olap/task/engine_clone_task.h 
b/be/src/olap/task/engine_clone_task.h
index ccb8a19581..90c4b43596 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -79,7 +79,7 @@ private:
     const TMasterInfo& _master_info;
     int64_t _copy_size;
     int64_t _copy_time_ms;
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 }; // EngineTask
 
 } // namespace doris
diff --git a/be/src/runtime/data_stream_mgr.cpp 
b/be/src/runtime/data_stream_mgr.cpp
index 3e519f8987..b0d1dbd8f2 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
               << ", node=" << dest_node_id;
-    shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
-            this, row_desc, state->query_mem_tracker(), fragment_instance_id, 
dest_node_id,
-            num_senders, is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
+    shared_ptr<DataStreamRecvr> recvr(
+            new DataStreamRecvr(this, row_desc, fragment_instance_id, 
dest_node_id, num_senders,
+                                is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     lock_guard<mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index 0d456ca74a..b7988178fb 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -447,9 +447,8 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* 
transfer_batch) {
 
 DataStreamRecvr::DataStreamRecvr(
         DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-        MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
-        RuntimeProfile* profile,
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
         std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
         : _mgr(stream_mgr),
           _fragment_instance_id(fragment_instance_id),
diff --git a/be/src/runtime/data_stream_recvr.h 
b/be/src/runtime/data_stream_recvr.h
index 31af9f2e37..efb036b5dd 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -116,9 +116,8 @@ private:
     class SenderQueue;
 
     DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-                    MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
-                    PlanNodeId dest_node_id, int num_senders, bool is_merging,
-                    int total_buffer_limit, RuntimeProfile* profile,
+                    const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id, int num_senders,
+                    bool is_merging, int total_buffer_limit, RuntimeProfile* 
profile,
                     std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
 
     // If receive queue is full, done is enqueue pending, and return with 
*done is nullptr
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index d879b417e0..f708f4b410 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -113,10 +113,12 @@ public:
         return nullptr;
     }
 
-    MemTrackerLimiter* process_mem_tracker() { return _process_mem_tracker; }
-    void set_process_mem_tracker(MemTrackerLimiter* tracker) { 
_process_mem_tracker = tracker; }
-    MemTrackerLimiter* query_pool_mem_tracker() { return 
_query_pool_mem_tracker; }
-    MemTrackerLimiter* load_pool_mem_tracker() { return 
_load_pool_mem_tracker; }
+    std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return 
_process_mem_tracker; }
+    void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
tracker) {
+        _process_mem_tracker = tracker;
+    }
+    std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return 
_query_pool_mem_tracker; }
+    std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return 
_load_pool_mem_tracker; }
     MemTrackerTaskPool* task_pool_mem_tracker_registry() { return 
_task_pool_mem_tracker_registry; }
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
@@ -184,11 +186,11 @@ private:
 
     // The ancestor for all trackers. Every tracker is visible from the 
process down.
     // Not limit total memory by process tracker, and it's just used to track 
virtual memory of process.
-    MemTrackerLimiter* _process_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
     // The ancestor for all querys tracker.
-    MemTrackerLimiter* _query_pool_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
     // The ancestor for all load tracker.
-    MemTrackerLimiter* _load_pool_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _load_pool_mem_tracker;
     MemTrackerTaskPool* _task_pool_mem_tracker_registry;
 
     // The following two thread pools are used in different scenarios.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index afecca4159..e9074af172 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -193,7 +193,8 @@ Status ExecEnv::_init_mem_tracker() {
                      << ". Using physical memory instead";
         global_memory_limit_bytes = MemInfo::physical_mem();
     }
-    _process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, 
"Process");
+    _process_mem_tracker =
+            std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, 
"Process");
     thread_context()->_thread_mem_tracker_mgr->init();
     thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
@@ -203,10 +204,12 @@ Status ExecEnv::_init_mem_tracker() {
     }
 #endif
 
-    _query_pool_mem_tracker = new MemTrackerLimiter(-1, "QueryPool", 
_process_mem_tracker);
+    _query_pool_mem_tracker =
+            std::make_shared<MemTrackerLimiter>(-1, "QueryPool", 
_process_mem_tracker);
     REGISTER_HOOK_METRIC(query_mem_consumption,
                          [this]() { return 
_query_pool_mem_tracker->consumption(); });
-    _load_pool_mem_tracker = new MemTrackerLimiter(-1, "LoadPool", 
_process_mem_tracker);
+    _load_pool_mem_tracker =
+            std::make_shared<MemTrackerLimiter>(-1, "LoadPool", 
_process_mem_tracker);
     REGISTER_HOOK_METRIC(load_mem_consumption,
                          [this]() { return 
_load_pool_mem_tracker->consumption(); });
     LOG(INFO) << "Using global memory limit: "
@@ -363,9 +366,6 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_routine_load_task_executor);
     SAFE_DELETE(_external_scan_context_mgr);
     SAFE_DELETE(_heartbeat_flags);
-    SAFE_DELETE(_process_mem_tracker);
-    SAFE_DELETE(_query_pool_mem_tracker);
-    SAFE_DELETE(_load_pool_mem_tracker);
     SAFE_DELETE(_task_pool_mem_tracker_registry);
     SAFE_DELETE(_buffer_reservation);
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index e1fc0a3463..56b21b7cef 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,11 @@
 
 namespace doris {
 
-LoadChannel::LoadChannel(const UniqueId& load_id, 
std::unique_ptr<MemTrackerLimiter> mem_tracker,
+LoadChannel::LoadChannel(const UniqueId& load_id, 
std::shared_ptr<MemTrackerLimiter>& mem_tracker,
                          int64_t timeout_s, bool is_high_priority, const 
std::string& sender_ip,
                          bool is_vec)
         : _load_id(load_id),
-          _mem_tracker(std::move(mem_tracker)),
+          _mem_tracker(mem_tracker),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
           _sender_ip(sender_ip),
@@ -60,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _mem_tracker.get(), 
_is_high_priority, _is_vec));
+            channel.reset(new TabletsChannel(key, _mem_tracker, 
_is_high_priority, _is_vec));
             _tablets_channels.insert({index_id, channel});
         }
     }
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index ad8a476fcf..9647528304 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,7 +39,7 @@ class Cache;
 // corresponding to a certain load job
 class LoadChannel {
 public:
-    LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter> 
mem_tracker,
+    LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& 
mem_tracker,
                 int64_t timeout_s, bool is_high_priority, const std::string& 
sender_ip,
                 bool is_vec);
     ~LoadChannel();
@@ -99,7 +99,7 @@ private:
 
     UniqueId _load_id;
     // Tracks the total memory consumed by current load job on this BE
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
     // lock protect the tablets channel map
     std::mutex _lock;
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 7429c09c6e..c252fe2aff 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -84,7 +84,7 @@ LoadChannelMgr::~LoadChannelMgr() {
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
     int64_t load_mgr_mem_limit = 
calc_process_max_load_memory(process_mem_limit);
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(load_mgr_mem_limit, 
"LoadChannelMgr");
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit, 
"LoadChannelMgr");
     REGISTER_HOOK_METRIC(load_channel_mem_consumption,
                          [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -110,13 +110,13 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
             int64_t load_mem_limit = params.has_load_mem_limit() ? 
params.load_mem_limit() : -1;
             int64_t channel_mem_limit =
                     calc_channel_max_load_memory(load_mem_limit, 
_mem_tracker->limit());
-            auto channel_mem_tracker = std::make_unique<MemTrackerLimiter>(
+            auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>(
                     channel_mem_limit,
                     fmt::format("LoadChannel#senderIp={}#loadID={}", 
params.sender_ip(),
                                 load_id.to_string()),
-                    _mem_tracker.get());
-            channel.reset(new LoadChannel(load_id, 
std::move(channel_mem_tracker),
-                                          channel_timeout_s, is_high_priority, 
params.sender_ip(),
+                    _mem_tracker);
+            channel.reset(new LoadChannel(load_id, channel_mem_tracker, 
channel_timeout_s,
+                                          is_high_priority, params.sender_ip(),
                                           params.is_vectorized()));
             _load_channels.insert({load_id, channel});
         }
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 0e46d8bf52..af9d3d6240 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -78,7 +78,7 @@ protected:
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
     CountDownLatch _stop_background_threads_latch;
     // thread to clean timeout load channels
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2223fce5a5..6a37b1df86 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -19,6 +19,8 @@
 
 #include <fmt/format.h>
 
+#include <boost/stacktrace.hpp>
+
 #include "gutil/once.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
@@ -29,7 +31,8 @@
 namespace doris {
 
 MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& 
label,
-                                     MemTrackerLimiter* parent, 
RuntimeProfile* profile)
+                                     const std::shared_ptr<MemTrackerLimiter>& 
parent,
+                                     RuntimeProfile* profile)
         : MemTracker(label, profile, true) {
     DCHECK_GE(byte_limit, -1);
     _limit = byte_limit;
@@ -42,32 +45,28 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, 
const std::string& labe
     while (tracker != nullptr) {
         _all_ancestors.push_back(tracker);
         if (tracker->has_limit()) _limited_ancestors.push_back(tracker);
-        tracker = tracker->_parent;
+        tracker = tracker->_parent.get();
     }
     DCHECK_GT(_all_ancestors.size(), 0);
     DCHECK_EQ(_all_ancestors[0], this);
-    if (_parent) _parent->add_child(this);
+    if (_parent) {
+        std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
+        _child_tracker_it = _parent->_child_tracker_limiters.insert(
+                _parent->_child_tracker_limiters.end(), this);
+        _had_child_count++;
+    }
 }
 
 MemTrackerLimiter::~MemTrackerLimiter() {
     // TCMalloc hook will be triggered during destructor memtracker, may cause 
crash.
     if (_label == "Process") doris::thread_context_ptr._init = false;
     DCHECK(remain_child_count() == 0 || _label == "Process");
-    if (_parent) _parent->remove_child(this);
-}
-
-void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) {
-    std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
-    tracker->_child_tracker_it =
-            _child_tracker_limiters.insert(_child_tracker_limiters.end(), 
tracker);
-    _had_child_count++;
-}
-
-void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
-    std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
-    if (tracker->_child_tracker_it != _child_tracker_limiters.end()) {
-        _child_tracker_limiters.erase(tracker->_child_tracker_it);
-        tracker->_child_tracker_it = _child_tracker_limiters.end();
+    if (_parent) {
+        std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
+        if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
+            _parent->_child_tracker_limiters.erase(_child_tracker_it);
+            _child_tracker_it = _parent->_child_tracker_limiters.end();
+        }
     }
 }
 
@@ -221,13 +220,14 @@ std::string MemTrackerLimiter::log_usage(int 
max_recursive_depth,
 Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const 
std::string& details,
                                              int64_t failed_allocation_size, 
Status failed_alloc) {
     STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
-    MemTrackerLimiter* process_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
     std::string detail =
             "Memory exceed limit. fragment={}, details={}, on backend={}. 
Memory left in process "
             "limit={}.";
-    detail = fmt::format(detail, state != nullptr ? 
print_id(state->fragment_instance_id()) : "",
-                         details, BackendOptions::get_localhost(),
-                         
PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
+    detail = fmt::format(
+            detail, state != nullptr ? print_id(state->fragment_instance_id()) 
: "", details,
+            BackendOptions::get_localhost(),
+            
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
+                                 TUnit::BYTES));
     if (!failed_alloc) {
         detail += " failed alloc=<{}>. current tracker={}.";
         detail = fmt::format(detail, failed_alloc.to_string(), _label);
@@ -240,13 +240,15 @@ Status 
MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::str
     Status status = Status::MemoryLimitExceeded(detail);
     if (state != nullptr) state->log_error(detail);
 
+    detail += "\n" + 
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
     // only print the tracker log_usage in be log.
-    if (process_tracker->spare_capacity() < failed_allocation_size) {
+    if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() < 
failed_allocation_size) {
         // Dumping the process MemTracker is expensive. Limiting the recursive 
depth to two
         // levels limits the level of detail to a one-line summary for each 
query MemTracker.
-        detail += "\n" + process_tracker->log_usage(2);
+        detail += "\n" + 
ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2);
+    } else {
+        detail += "\n" + log_usage();
     }
-    detail += "\n" + log_usage();
 
     LOG(WARNING) << detail;
     return status;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 5c41ce7cda..c85205b2c0 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -42,8 +42,10 @@ class RuntimeState;
 class MemTrackerLimiter final : public MemTracker {
 public:
     // Creates and adds the tracker limiter to the tree
-    MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = 
std::string(),
-                      MemTrackerLimiter* parent = nullptr, RuntimeProfile* 
profile = nullptr);
+    MemTrackerLimiter(
+            int64_t byte_limit = -1, const std::string& label = std::string(),
+            const std::shared_ptr<MemTrackerLimiter>& parent = 
std::shared_ptr<MemTrackerLimiter>(),
+            RuntimeProfile* profile = nullptr);
 
     // If the final consumption is not as expected, this usually means that 
the same memory is calling
     // consume and release on different trackers. If the two trackers have a 
parent-child relationship,
@@ -51,10 +53,7 @@ public:
     // no parent-child relationship, the two tracker consumptions are wrong.
     ~MemTrackerLimiter();
 
-    MemTrackerLimiter* parent() const { return _parent; }
-
-    void add_child(MemTrackerLimiter* tracker);
-    void remove_child(MemTrackerLimiter* tracker);
+    std::shared_ptr<MemTrackerLimiter> parent() const { return _parent; }
 
     size_t remain_child_count() const { return _child_tracker_limiters.size(); 
}
     size_t had_child_count() const { return _had_child_count; }
@@ -187,7 +186,7 @@ private:
     // Group number in MemTracker::mem_tracker_pool, generated by the 
timestamp.
     int64_t _group_num;
 
-    MemTrackerLimiter* _parent; // The parent of this tracker.
+    std::shared_ptr<MemTrackerLimiter> _parent; // The parent of this tracker.
 
     // this tracker limiter plus all of its ancestors
     std::vector<MemTrackerLimiter*> _all_ancestors;
@@ -199,13 +198,12 @@ private:
     // update that of its children).
     mutable std::mutex _child_tracker_limiter_lock;
     std::list<MemTrackerLimiter*> _child_tracker_limiters;
+    // Iterator into parent_->_child_tracker_limiters for this object. Stored 
to have O(1) remove.
+    std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
 
     // The number of child trackers that have been added.
     std::atomic_size_t _had_child_count = 0;
 
-    // Iterator into parent_->_child_tracker_limiters for this object. Stored 
to have O(1) remove.
-    std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
-
     // Lock to protect gc_memory(). This prevents many GCs from occurring at 
once.
     std::mutex _gc_lock;
     // Functions to call after the limit is reached to free memory.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp 
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 24a8c95180..3c775db5ec 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -23,47 +23,49 @@
 
 namespace doris {
 
-MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const 
std::string& task_id,
-                                                                      int64_t 
mem_limit,
-                                                                      const 
std::string& label,
-                                                                      
MemTrackerLimiter* parent) {
+std::shared_ptr<MemTrackerLimiter> 
MemTrackerTaskPool::register_task_mem_tracker_impl(
+        const std::string& task_id, int64_t mem_limit, const std::string& 
label,
+        const std::shared_ptr<MemTrackerLimiter>& parent) {
     DCHECK(!task_id.empty());
     // First time this task_id registered, make a new object, otherwise do 
nothing.
     // Combine new tracker and emplace into one operation to avoid the use of 
locks
     // Name for task MemTrackers. '$0' is replaced with the task id.
+    std::shared_ptr<MemTrackerLimiter> tracker;
     bool new_emplace = _task_mem_trackers.lazy_emplace_l(
-            task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
+            task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) { 
tracker = v; },
             [&](const auto& ctor) {
-                ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, 
label, parent));
+                tracker = std::make_shared<MemTrackerLimiter>(mem_limit, 
label, parent);
+                ctor(task_id, tracker);
             });
     if (new_emplace) {
         LOG(INFO) << "Register query/load memory tracker, query/load id: " << 
task_id
                   << " limit: " << PrettyPrinter::print(mem_limit, 
TUnit::BYTES);
     }
-    return _task_mem_trackers[task_id].get();
+    return tracker;
 }
 
-MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const 
std::string& query_id,
-                                                                  int64_t 
mem_limit) {
+std::shared_ptr<MemTrackerLimiter> 
MemTrackerTaskPool::register_query_mem_tracker(
+        const std::string& query_id, int64_t mem_limit) {
     return register_task_mem_tracker_impl(query_id, mem_limit,
                                           fmt::format("Query#queryId={}", 
query_id),
                                           
ExecEnv::GetInstance()->query_pool_mem_tracker());
 }
 
-MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const 
std::string& load_id,
-                                                                 int64_t 
mem_limit) {
+std::shared_ptr<MemTrackerLimiter> 
MemTrackerTaskPool::register_load_mem_tracker(
+        const std::string& load_id, int64_t mem_limit) {
     // In load, the query id of the fragment is executed, which is the same as 
the load id of the load channel.
     return register_task_mem_tracker_impl(load_id, mem_limit,
                                           fmt::format("Load#queryId={}", 
load_id),
                                           
ExecEnv::GetInstance()->load_pool_mem_tracker());
 }
 
-MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& 
task_id) {
+std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
+        const std::string& task_id) {
     DCHECK(!task_id.empty());
-    MemTrackerLimiter* tracker = nullptr;
+    std::shared_ptr<MemTrackerLimiter> tracker = nullptr;
     // Avoid using locks to resolve erase conflicts
     _task_mem_trackers.if_contains(
-            task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { 
tracker = v.get(); });
+            task_id, [&tracker](const std::shared_ptr<MemTrackerLimiter>& v) { 
tracker = v; });
     return tracker;
 }
 
@@ -74,8 +76,8 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             // Unknown exception case with high concurrency, after 
_task_mem_trackers.erase,
             // the key still exists in _task_mem_trackers. 
https://github.com/apache/incubator-doris/issues/10006
             expired_task_ids.emplace_back(it->first);
-        } else if (it->second->remain_child_count() == 0 && 
it->second->had_child_count() != 0) {
-            // No RuntimeState uses this task MemTracker, it is only 
referenced by this map,
+        } else if (it->second.use_count() == 1 && 
it->second->had_child_count() != 0) {
+            // No RuntimeState uses this task MemTrackerLimiter, it is only 
referenced by this map,
             // and tracker was not created soon, delete it.
             //
             // If consumption is not equal to 0 before query mem tracker is 
destructed,
@@ -92,20 +94,12 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             
it->second->parent()->consumption_revise(-it->second->consumption());
             LOG(INFO) << "Deregister query/load memory tracker, 
queryId/loadId: " << it->first;
             expired_task_ids.emplace_back(it->first);
-        } else {
-            // Log limit exceeded query tracker.
-            if (it->second->limit_exceeded()) {
-                it->second->mem_limit_exceeded(
-                        nullptr,
-                        fmt::format("Task mem limit exceeded but no cancel, 
queryId:{}", it->first),
-                        0, Status::OK());
-            }
         }
     }
     for (auto tid : expired_task_ids) {
         // Verify the condition again to make sure the tracker is not being 
used again.
-        _task_mem_trackers.erase_if(tid, 
[&](std::shared_ptr<MemTrackerLimiter> v) {
-            return !v || v->remain_child_count() == 0;
+        _task_mem_trackers.erase_if(tid, [&](const 
std::shared_ptr<MemTrackerLimiter>& v) {
+            return !v || v.use_count() == 1;
         });
     }
 }
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h 
b/be/src/runtime/memory/mem_tracker_task_pool.h
index 4890d72713..1958542fe9 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -32,26 +32,28 @@ using TaskTrackersMap = phmap::parallel_flat_hash_map<
 // Global task pool for query MemTrackers. Owned by ExecEnv.
 class MemTrackerTaskPool {
 public:
-    // Construct a MemTracker object for 'task_id' with 'mem_limit' as the 
memory limit.
-    // The MemTracker is a child of the pool MemTracker, Calling this with the 
same
-    // 'task_id' will return the same MemTracker object. This is used to track 
the local
+    // Construct a MemTrackerLimiter object for 'task_id' with 'mem_limit' as 
the memory limit.
+    // The MemTrackerLimiter is a child of the pool MemTrackerLimiter, Calling 
this with the same
+    // 'task_id' will return the same MemTrackerLimiter object. This is used 
to track the local
     // memory usage of all tasks executing. The first time this is called for 
a task,
-    // a new MemTracker object is created with the pool tracker as its parent.
+    // a new MemTrackerLimiter object is created with the pool tracker as its 
parent.
     // Newly created trackers will always have a limit of -1.
-    MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& 
task_id, int64_t mem_limit,
-                                                      const std::string& label,
-                                                      MemTrackerLimiter* 
parent);
-    MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, 
int64_t mem_limit);
-    MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, 
int64_t mem_limit);
+    std::shared_ptr<MemTrackerLimiter> register_task_mem_tracker_impl(
+            const std::string& task_id, int64_t mem_limit, const std::string& 
label,
+            const std::shared_ptr<MemTrackerLimiter>& parent);
+    std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const 
std::string& query_id,
+                                                                  int64_t 
mem_limit);
+    std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const 
std::string& load_id,
+                                                                 int64_t 
mem_limit);
 
-    MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
+    std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string& 
task_id);
 
     // Remove the mem tracker that has ended the query.
     void logout_task_mem_tracker();
 
 private:
-    // All per-task MemTracker objects.
-    // The life cycle of task memtracker in the process is the same as task 
runtime state,
+    // All per-task MemTrackerLimiter objects.
+    // The life cycle of task MemTrackerLimiter in the process is the same as 
task runtime state,
     // MemTrackers will be removed from this map after query finish or cancel.
     TaskTrackersMap _task_mem_trackers;
 };
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 7841a7cb6a..b6f1ebde33 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -24,10 +24,10 @@
 
 namespace doris {
 
-void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
-                                                 const std::string& task_id,
-                                                 const TUniqueId& 
fragment_instance_id,
-                                                 MemTrackerLimiter* 
mem_tracker) {
+void ThreadMemTrackerMgr::attach_limiter_tracker(
+        const std::string& cancel_msg, const std::string& task_id,
+        const TUniqueId& fragment_instance_id,
+        const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
     DCHECK(mem_tracker);
     flush_untracked_mem<false>();
     _task_id = task_id;
@@ -37,8 +37,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const 
std::string& cancel_msg,
 }
 
 void ThreadMemTrackerMgr::detach_limiter_tracker() {
-    // Do not flush untracked mem, instance executor thread may exit after 
instance fragment executor thread,
-    // `instance_mem_tracker` will be null pointer, which is not a graceful 
exit.
+    flush_untracked_mem<false>();
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _exceed_cb.cancel_msg = "";
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 2d23388a81..d3940d1e50 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -77,7 +77,7 @@ public:
     // After attach, the current thread TCMalloc Hook starts to 
consume/release task mem_tracker
     void attach_limiter_tracker(const std::string& cancel_msg, const 
std::string& task_id,
                                 const TUniqueId& fragment_instance_id,
-                                MemTrackerLimiter* mem_tracker);
+                                const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker);
 
     void detach_limiter_tracker();
 
@@ -116,7 +116,7 @@ public:
 
     bool is_attach_task() { return _task_id != ""; }
 
-    MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; }
+    std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return 
_limiter_tracker; }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -145,7 +145,7 @@ private:
     // Frequent calls to unordered_map _untracked_mems[] in consume will 
degrade performance.
     int64_t _untracked_mem = 0;
 
-    MemTrackerLimiter* _limiter_tracker;
+    std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
     std::vector<MemTracker*> _consumer_tracker_stack;
 
     // If true, call memtracker try_consume, otherwise call consume.
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4c3d114447..2d128c13e4 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -241,9 +241,10 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& 
query_id) {
                 print_id(query_id), bytes_limit);
     } else {
         DCHECK(false);
+        _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
     }
 
-    _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
             bytes_limit, "RuntimeState:instance:" + 
print_id(_fragment_instance_id),
             _query_mem_tracker, &_profile);
 
@@ -263,7 +264,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& 
query_id) {
 
 Status RuntimeState::init_instance_mem_tracker() {
     _query_mem_tracker = nullptr;
-    _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, 
"RuntimeState:instance");
+    _instance_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, 
"RuntimeState:instance");
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f561adecf8..9ab470bf64 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -127,8 +127,8 @@ public:
     const TUniqueId& query_id() const { return _query_id; }
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
     ExecEnv* exec_env() { return _exec_env; }
-    MemTrackerLimiter* query_mem_tracker() { return _query_mem_tracker; }
-    MemTrackerLimiter* instance_mem_tracker() { return 
_instance_mem_tracker.get(); }
+    std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return 
_query_mem_tracker; }
+    std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return 
_instance_mem_tracker; }
     ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
 
     void set_fragment_root_id(PlanNodeId id) {
@@ -390,10 +390,10 @@ private:
 
     // MemTracker that is shared by all fragment instances running on this 
host.
     // The query mem tracker must be released after the _instance_mem_tracker.
-    MemTrackerLimiter* _query_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
 
     // Memory usage of this fragment instance
-    std::unique_ptr<MemTrackerLimiter> _instance_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker;
 
     // put runtime state before _obj_pool, so that it will be deconstructed 
after
     // _obj_pool. Because some of object in _obj_pool will use profile when 
deconstructing.
diff --git a/be/src/runtime/sorted_run_merger.cc 
b/be/src/runtime/sorted_run_merger.cc
index de44701293..28d347462f 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -182,7 +182,7 @@ private:
     // signal of new batch or the eos/cancelled condition
     std::condition_variable _batch_prepared_cv;
 
-    void process_sorted_run_task(MemTrackerLimiter* mem_tracker) {
+    void process_sorted_run_task(const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker) {
         SCOPED_ATTACH_TASK(mem_tracker, ThreadContext::TaskType::QUERY);
         std::unique_lock<std::mutex> lock(_mutex);
         while (true) {
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 043f4c7eab..0934214f89 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -30,14 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, 
MetricUnit::NOUNIT);
 
 std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, 
MemTrackerLimiter* parent_tracker,
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
+                               const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker,
                                bool is_high_priority, bool is_vec)
         : _key(key),
           _state(kInitialized),
           _closed_senders(64),
           _is_high_priority(is_high_priority),
           _is_vec(is_vec) {
-    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+    _mem_tracker = std::make_shared<MemTrackerLimiter>(
             -1, fmt::format("TabletsChannel#indexID={}", key.index_id), 
parent_tracker);
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
@@ -240,7 +241,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
         wrequest.ptable_schema_param = request.schema();
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), 
_is_vec);
+        auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec);
         if (!st.ok()) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 318dd879a4..78ca9ae076 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -60,8 +60,9 @@ class OlapTableSchemaParam;
 // Write channel for a particular (load, index).
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* 
parent_tracker,
-                   bool is_high_priority, bool is_vec);
+    TabletsChannel(const TabletsChannelKey& key,
+                   const std::shared_ptr<MemTrackerLimiter>& parent_tracker, 
bool is_high_priority,
+                   bool is_vec);
 
     ~TabletsChannel();
 
@@ -144,7 +145,7 @@ private:
 
     static std::atomic<uint64_t> _s_tablet_writer_count;
 
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
     bool _is_high_priority = false;
 
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 6071defe64..7a6968b30f 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -29,8 +29,9 @@ ThreadContextPtr::ThreadContextPtr() {
     _init = true;
 }
 
-AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const 
ThreadContext::TaskType& type,
-                       const std::string& task_id, const TUniqueId& 
fragment_instance_id) {
+AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
+                       const ThreadContext::TaskType& type, const std::string& 
task_id,
+                       const TUniqueId& fragment_instance_id) {
     DCHECK(mem_tracker);
 #ifdef USE_MEM_TRACKER
     thread_context()->attach_task(type, task_id, fragment_instance_id, 
mem_tracker);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 2c14929432..0acc7c6556 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -130,7 +130,8 @@ public:
     }
 
     void attach_task(const TaskType& type, const std::string& task_id,
-                     const TUniqueId& fragment_instance_id, MemTrackerLimiter* 
mem_tracker) {
+                     const TUniqueId& fragment_instance_id,
+                     const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
         DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && 
_task_id == "")
                 << ",new tracker label: " << mem_tracker->label() << ",old 
tracker label: "
                 << _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
@@ -195,7 +196,7 @@ static ThreadContext* thread_context() {
 
 class AttachTask {
 public:
-    explicit AttachTask(MemTrackerLimiter* mem_tracker,
+    explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
                         const ThreadContext::TaskType& type = 
ThreadContext::TaskType::UNKNOWN,
                         const std::string& task_id = "",
                         const TUniqueId& fragment_instance_id = TUniqueId());
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 7603beacff..15eb95b4b3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -118,20 +118,20 @@ void 
PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
                                           const Status& extract_st) {
     std::string query_id;
     TUniqueId finst_id;
-    std::unique_ptr<MemTrackerLimiter> transmit_tracker;
+    std::shared_ptr<MemTrackerLimiter> transmit_tracker;
     if (request->has_query_id()) {
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
         // In some cases, query mem tracker does not exist in BE when transmit 
block, will get null pointer.
-        transmit_tracker = std::make_unique<MemTrackerLimiter>(
+        transmit_tracker = std::make_shared<MemTrackerLimiter>(
                 -1, fmt::format("QueryTransmit#queryId={}", query_id),
                 
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
     } else {
         query_id = "unkown_transmit_data";
-        transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, 
"unkown_transmit_data");
+        transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, 
"unkown_transmit_data");
     }
-    SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, 
query_id, finst_id);
+    SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, 
query_id, finst_id);
     VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
              << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_data(),
@@ -649,20 +649,20 @@ void 
PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
                                            const Status& extract_st) {
     std::string query_id;
     TUniqueId finst_id;
-    std::unique_ptr<MemTrackerLimiter> transmit_tracker;
+    std::shared_ptr<MemTrackerLimiter> transmit_tracker;
     if (request->has_query_id()) {
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
         // In some cases, query mem tracker does not exist in BE when transmit 
block, will get null pointer.
-        transmit_tracker = std::make_unique<MemTrackerLimiter>(
+        transmit_tracker = std::make_shared<MemTrackerLimiter>(
                 -1, fmt::format("QueryTransmit#queryId={}", query_id),
                 
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
     } else {
         query_id = "unkown_transmit_block";
-        transmit_tracker = std::make_unique<MemTrackerLimiter>(-1, 
"unkown_transmit_block");
+        transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, 
"unkown_transmit_block");
     }
-    SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY, 
query_id, finst_id);
+    SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, 
query_id, finst_id);
     VLOG_ROW << "transmit block: fragment_instance_id=" << 
print_id(request->finst_id())
              << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_block(),
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index bee1fcf10c..511fbbe19d 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
     VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
               << ", node=" << dest_node_id;
     std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
-            this, row_desc, state->query_mem_tracker(), fragment_instance_id, 
dest_node_id,
-            num_senders, is_merging, buffer_size, profile, 
sub_plan_query_statistics_recvr));
+            this, row_desc, fragment_instance_id, dest_node_id, num_senders, 
is_merging,
+            buffer_size, profile, sub_plan_query_statistics_recvr));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     std::lock_guard<std::mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 020f188cac..2fb7be3223 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -249,9 +249,8 @@ void VDataStreamRecvr::SenderQueue::close() {
 
 VDataStreamRecvr::VDataStreamRecvr(
         VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-        MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
-        RuntimeProfile* profile,
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
         std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
         : _mgr(stream_mgr),
           _fragment_instance_id(fragment_instance_id),
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 87024a917a..bedd18bbce 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -51,9 +51,9 @@ class VExprContext;
 class VDataStreamRecvr {
 public:
     VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
-                     MemTrackerLimiter* query_mem_tracker, const TUniqueId& 
fragment_instance_id,
-                     PlanNodeId dest_node_id, int num_senders, bool is_merging,
-                     int total_buffer_limit, RuntimeProfile* profile,
+                     const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
+                     int num_senders, bool is_merging, int total_buffer_limit,
+                     RuntimeProfile* profile,
                      std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr);
 
     ~VDataStreamRecvr();
diff --git a/be/test/runtime/mem_limit_test.cpp 
b/be/test/runtime/mem_limit_test.cpp
index b0a298ff8a..eeadb41b28 100644
--- a/be/test/runtime/mem_limit_test.cpp
+++ b/be/test/runtime/mem_limit_test.cpp
@@ -53,9 +53,9 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
 }
 
 TEST(MemTestTest, TrackerHierarchy) {
-    auto p = std::make_unique<MemTrackerLimiter>(100);
-    auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
-    auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
+    auto p = std::make_shared<MemTrackerLimiter>(100);
+    auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
+    auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
 
     // everything below limits
     c1->consume(60);
@@ -96,9 +96,9 @@ TEST(MemTestTest, TrackerHierarchy) {
 }
 
 TEST(MemTestTest, TrackerHierarchyTryConsume) {
-    auto p = std::make_unique<MemTrackerLimiter>(100);
-    auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
-    auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
+    auto p = std::make_shared<MemTrackerLimiter>(100);
+    auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
+    auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
 
     // everything below limits
     bool consumption = c1->try_consume(60).ok();
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index b84cc6c375..0f6579ec9e 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -29,7 +29,8 @@
 #include "util/mem_info.h"
 
 int main(int argc, char** argv) {
-    doris::MemTrackerLimiter* process_mem_tracker = new 
doris::MemTrackerLimiter(-1, "Process");
+    std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
+            std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
     
doris::ExecEnv::GetInstance()->set_process_mem_tracker(process_mem_tracker);
     doris::thread_context()->_thread_mem_tracker_mgr->init();
     doris::StoragePageCache::create_global_cache(1 << 30, 10);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to