This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new cf05284da6 [cherrypick](branch1.2) Pick memory manager related fixes 
(#16184)
cf05284da6 is described below

commit cf05284da64d5c8a05f6b491e00cedb4cd99091d
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Jan 30 14:47:14 2023 +0800

    [cherrypick](branch1.2) Pick memory manager related fixes (#16184)
    
    cherry pick
    #15716
    #16083
    #16084
---
 be/src/common/config.h                          |   6 +-
 be/src/common/daemon.cpp                        |  42 +++++++---
 be/src/exec/analytic_eval_node.cpp              |   3 -
 be/src/exec/blocking_join_node.cpp              |   1 -
 be/src/exec/broker_scan_node.cpp                |   1 -
 be/src/exec/es_http_scan_node.cpp               |   1 -
 be/src/exec/exec_node.cpp                       |   6 +-
 be/src/exec/exec_node.h                         |   6 +-
 be/src/exec/hash_join_node.cpp                  |   1 -
 be/src/exec/olap_scan_node.cpp                  |   2 -
 be/src/exec/set_operation_node.cpp              |   2 -
 be/src/exec/spill_sort_node.cc                  |   3 -
 be/src/exec/table_function_node.cpp             |   3 -
 be/src/exec/topn_node.cpp                       |   3 -
 be/src/exec/union_node.cpp                      |   3 -
 be/src/http/default_path_handlers.cpp           |   6 +-
 be/src/olap/delta_writer.cpp                    |   4 +-
 be/src/olap/memtable.cpp                        |   4 +-
 be/src/olap/olap_server.cpp                     |  10 +--
 be/src/olap/push_handler.cpp                    |   4 +-
 be/src/olap/storage_engine.cpp                  |   7 +-
 be/src/olap/storage_engine.h                    |   2 -
 be/src/olap/tablet_manager.cpp                  |  11 ++-
 be/src/olap/tablet_manager.h                    |   1 +
 be/src/olap/task/engine_batch_load_task.cpp     |   2 +-
 be/src/olap/task/engine_checksum_task.cpp       |   2 +-
 be/src/runtime/data_stream_recvr.cc             |   5 +-
 be/src/runtime/data_stream_sender.cpp           |   3 +-
 be/src/runtime/exec_env.h                       |   7 +-
 be/src/runtime/exec_env_init.cpp                |  12 ++-
 be/src/runtime/load_channel_mgr.cpp             |   9 +-
 be/src/runtime/load_channel_mgr.h               |  27 +++---
 be/src/runtime/memory/mem_tracker.cpp           |  13 ++-
 be/src/runtime/memory/mem_tracker.h             |   9 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp   | 104 +++++++++++++++---------
 be/src/runtime/memory/mem_tracker_limiter.h     |  42 ++++++----
 be/src/runtime/plan_fragment_executor.cpp       |   1 -
 be/src/runtime/runtime_filter_mgr.cpp           |   6 +-
 be/src/runtime/runtime_state.cpp                |   5 +-
 be/src/runtime/runtime_state.h                  |   8 --
 be/src/util/mem_info.cpp                        |  59 ++++++++++----
 be/src/util/mem_info.h                          |  10 ++-
 be/src/vec/exec/join/vhash_join_node.cpp        |   2 -
 be/src/vec/exec/join/vjoin_node_base.cpp        |   2 -
 be/src/vec/exec/join/vnested_loop_join_node.cpp |   3 -
 be/src/vec/exec/scan/new_es_scan_node.cpp       |   1 -
 be/src/vec/exec/scan/new_jdbc_scan_node.cpp     |   1 -
 be/src/vec/exec/scan/new_odbc_scan_node.cpp     |   1 -
 be/src/vec/exec/scan/new_olap_scan_node.cpp     |   1 -
 be/src/vec/exec/scan/scanner_scheduler.cpp      |   1 -
 be/src/vec/exec/scan/vscan_node.cpp             |   5 --
 be/src/vec/exec/vaggregation_node.cpp           |   3 -
 be/src/vec/exec/vanalytic_eval_node.cpp         |   3 -
 be/src/vec/exec/vbroker_scan_node.cpp           |   4 -
 be/src/vec/exec/vexchange_node.cpp              |   3 -
 be/src/vec/exec/vmysql_scan_node.cpp            |   2 -
 be/src/vec/exec/vschema_scan_node.cpp           |   2 -
 be/src/vec/exec/vset_operation_node.cpp         |   2 -
 be/src/vec/exec/vsort_node.cpp                  |   3 -
 be/src/vec/runtime/vdata_stream_recvr.cpp       |   5 +-
 be/src/vec/sink/vdata_stream_sender.cpp         |   3 +-
 be/test/testutil/run_all_tests.cpp              |   5 +-
 62 files changed, 275 insertions(+), 233 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 425005e021..8eb20699da 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -74,6 +74,10 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, 
"1717986918");
 // The size of the memory that gc wants to release each time, as a percentage 
of the mem limit.
 CONF_mString(process_minor_gc_size, "10%");
 CONF_mString(process_full_gc_size, "20%");
+// Some caches have their own gc threads, such as segment cache.
+// For caches that do not have a separate gc thread, perform regular gc in the 
memory maintenance thread.
+// Currently only storage page cache, chunk allocator, more in the future.
+CONF_mInt32(cache_gc_interval_s, "60");
 
 // If true, when the process does not exceed the soft mem limit, the query 
memory will not be limited;
 // when the process memory exceeds the soft mem limit, the query with the 
largest ratio between the currently
@@ -505,7 +509,7 @@ CONF_String(buffer_pool_limit, "20%");
 CONF_String(buffer_pool_clean_pages_limit, "50%");
 
 // Sleep time in milliseconds between memory maintenance iterations
-CONF_mInt64(memory_maintenance_sleep_time_ms, "500");
+CONF_mInt32(memory_maintenance_sleep_time_ms, "500");
 
 // Sleep time in milliseconds between load channel memory refresh iterations
 CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 992b6528b3..149b2f1ce4 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -202,7 +202,9 @@ void Daemon::buffer_pool_gc_thread() {
 }
 
 void Daemon::memory_maintenance_thread() {
-    int64_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
+    int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
+    int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
+    int64_t cache_gc_freed_mem = 0;
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::milliseconds(interval_milliseconds))) {
         if (!MemInfo::initialized()) {
@@ -217,29 +219,49 @@ void Daemon::memory_maintenance_thread() {
         doris::MemInfo::refresh_allocator_mem();
 #endif
         doris::MemInfo::refresh_proc_mem_no_allocator_cache();
-        LOG_EVERY_N(INFO, 10) << MemTrackerLimiter::process_mem_log_str();
 
         // Refresh mem tracker each type metrics.
         doris::MemTrackerLimiter::refresh_global_counter();
-        if (doris::config::memory_debug) {
-            doris::MemTrackerLimiter::print_log_process_usage("memory_debug", 
false);
-        }
-        doris::MemTrackerLimiter::enable_print_log_process_usage();
 
         // If system available memory is not enough, or the process memory 
exceeds the limit, reduce refresh interval.
         if (doris::MemInfo::sys_mem_available() <
                     doris::MemInfo::sys_mem_available_low_water_mark() ||
             doris::MemInfo::proc_mem_no_allocator_cache() >= 
doris::MemInfo::mem_limit()) {
-            interval_milliseconds = 100;
-            doris::MemInfo::process_full_gc();
+            doris::MemTrackerLimiter::print_log_process_usage("process full 
gc", false);
+            interval_milliseconds = std::min(100, 
config::memory_maintenance_sleep_time_ms);
+            if (doris::MemInfo::process_full_gc()) {
+                // If there is not enough memory to be gc, the process memory 
usage will not be printed in the next continuous gc.
+                doris::MemTrackerLimiter::enable_print_log_process_usage();
+            }
+            cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
         } else if (doris::MemInfo::sys_mem_available() <
                            
doris::MemInfo::sys_mem_available_warning_water_mark() ||
                    doris::MemInfo::proc_mem_no_allocator_cache() >=
                            doris::MemInfo::soft_mem_limit()) {
-            interval_milliseconds = 200;
-            doris::MemInfo::process_minor_gc();
+            doris::MemTrackerLimiter::print_log_process_usage("process minor 
gc", false);
+            interval_milliseconds = std::min(200, 
config::memory_maintenance_sleep_time_ms);
+            if (doris::MemInfo::process_minor_gc()) {
+                doris::MemTrackerLimiter::enable_print_log_process_usage();
+            }
+            cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
         } else {
+            doris::MemTrackerLimiter::enable_print_log_process_usage();
             interval_milliseconds = config::memory_maintenance_sleep_time_ms;
+            if (doris::config::memory_debug) {
+                LOG_EVERY_N(WARNING, 20) << 
doris::MemTrackerLimiter::log_process_usage_str(
+                        "memory debug", false); // default 10s print once
+            } else {
+                LOG_EVERY_N(INFO, 10)
+                        << MemTrackerLimiter::process_mem_log_str(); // 
default 5s print once
+            }
+            cache_gc_interval_ms -= interval_milliseconds;
+            if (cache_gc_interval_ms < 0) {
+                cache_gc_freed_mem = 0;
+                doris::MemInfo::process_cache_gc(cache_gc_freed_mem);
+                LOG(INFO) << fmt::format("Process regular GC Cache, Free 
Memory {} Bytes",
+                                         cache_gc_freed_mem); // default 6s 
print once
+                cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
+            }
         }
     }
 }
diff --git a/be/src/exec/analytic_eval_node.cpp 
b/be/src/exec/analytic_eval_node.cpp
index 19e6fe87c4..9818ddb433 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -142,7 +142,6 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status AnalyticEvalNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
     _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
     _curr_tuple_pool.reset(new MemPool(mem_tracker()));
@@ -186,7 +185,6 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
 Status AnalyticEvalNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(child(0)->open(state));
@@ -814,7 +812,6 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() 
const {
 
 Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, 
bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(state->check_query_state("Analytic eval, while 
get_next."));
diff --git a/be/src/exec/blocking_join_node.cpp 
b/be/src/exec/blocking_join_node.cpp
index 0d366ab1d4..10419a3499 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -93,7 +93,6 @@ Status BlockingJoinNode::close(RuntimeState* state) {
 
 void BlockingJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(construct_build_side(state));
 }
 
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index a281d29c6e..f822c35bbb 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -376,7 +376,6 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& 
scan_range,
 
 void BrokerScanNode::scanner_worker(int start_idx, int length) {
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, 
&scanner_expr_ctxs);
diff --git a/be/src/exec/es_http_scan_node.cpp 
b/be/src/exec/es_http_scan_node.cpp
index cea7068631..14d2911a0b 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -424,7 +424,6 @@ static std::string get_host_port(const 
std::vector<TNetworkAddress>& es_hosts) {
 
 void EsHttpScanNode::scanner_worker(int start_idx, int length, 
std::promise<Status>& p_status) {
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     DCHECK(start_idx < length);
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index d804533e88..d73888e0b8 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -220,9 +220,8 @@ Status ExecNode::prepare(RuntimeState* state) {
             std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
                                runtime_profile()->total_time_counter()),
             "");
-    _mem_tracker = std::make_shared<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
-                                                _runtime_profile.get());
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
+                                                _runtime_profile.get(), 
nullptr, "PeakMemoryUsage");
 
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, 
intermediate_row_desc()));
@@ -244,7 +243,6 @@ Status ExecNode::prepare(RuntimeState* state) {
 }
 
 Status ExecNode::open(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
     }
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 1de8f921a8..5ea19d8d81 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -195,7 +195,6 @@ public:
     RuntimeProfile::Counter* memory_used_counter() const { return 
_memory_used_counter; }
 
     MemTracker* mem_tracker() const { return _mem_tracker.get(); }
-    std::shared_ptr<MemTracker> mem_tracker_shared() const { return 
_mem_tracker; }
 
     OpentelemetrySpan get_next_span() { return _get_next_span; }
 
@@ -299,8 +298,9 @@ protected:
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
-    /// Account for peak memory used by this node
-    std::shared_ptr<MemTracker> _mem_tracker;
+    // Record this node memory size. it is expected that artificial guarantees 
are accurate,
+    // which will providea reference for operator memory.
+    std::unique_ptr<MemTracker> _mem_tracker;
 
     RuntimeProfile::Counter* _rows_returned_counter;
     RuntimeProfile::Counter* _rows_returned_rate;
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 471801abca..73b3720c0b 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -184,7 +184,6 @@ Status HashJoinNode::close(RuntimeState* state) {
 
 void HashJoinNode::probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(child(0)->open(state));
 }
 
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 35f3ef5023..3a88d7bd99 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1487,7 +1487,6 @@ Status 
OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
 void OlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Status status = Status::OK();
     for (auto scanner : _olap_scanners) {
         status = Expr::clone_if_not_exists(_conjunct_ctxs, state, 
scanner->conjunct_ctxs());
@@ -1666,7 +1665,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 }
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Thread::set_self_name("olap_scanner");
     if (UNLIKELY(_transfer_done)) {
         _scanner_done = true;
diff --git a/be/src/exec/set_operation_node.cpp 
b/be/src/exec/set_operation_node.cpp
index 3458c57293..5ce2c947e1 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -42,7 +42,6 @@ Status SetOperationNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status SetOperationNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
     _build_pool.reset(new MemPool(mem_tracker()));
@@ -130,7 +129,6 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* 
other) {
 Status SetOperationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     // open result expr lists.
     for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc
index a40d889b0e..b74c9ba467 100644
--- a/be/src/exec/spill_sort_node.cc
+++ b/be/src/exec/spill_sort_node.cc
@@ -42,7 +42,6 @@ Status SpillSortNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status SpillSortNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), 
_row_descriptor));
     // AddExprCtxsToFree(_sort_exec_exprs);
     return Status::OK();
@@ -51,7 +50,6 @@ Status SpillSortNode::prepare(RuntimeState* state) {
 Status SpillSortNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(_sort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Spill sort, while open."));
@@ -81,7 +79,6 @@ Status SpillSortNode::open(RuntimeState* state) {
 
 Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Spill sort, while getting 
next."));
 
diff --git a/be/src/exec/table_function_node.cpp 
b/be/src/exec/table_function_node.cpp
index 1eb94aac0e..249d465b51 100644
--- a/be/src/exec/table_function_node.cpp
+++ b/be/src/exec/table_function_node.cpp
@@ -90,7 +90,6 @@ bool TableFunctionNode::_is_inner_and_empty() {
 
 Status TableFunctionNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", 
TUnit::UNIT);
 
@@ -106,7 +105,6 @@ Status TableFunctionNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     RETURN_IF_ERROR(Expr::open(_fn_ctxs, state));
     RETURN_IF_ERROR(vectorized::VExpr::open(_vfn_ctxs, state));
@@ -198,7 +196,6 @@ bool TableFunctionNode::_roll_table_functions(int 
last_eos_idx) {
 // And the inner loop is to expand the row by table functions, and output row 
by row.
 Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, 
bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     const RowDescriptor& parent_rowdesc = row_batch->row_desc();
     const RowDescriptor& child_rowdesc = _children[0]->row_desc();
diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp
index f5215b10d5..0a2ab6eceb 100644
--- a/be/src/exec/topn_node.cpp
+++ b/be/src/exec/topn_node.cpp
@@ -60,7 +60,6 @@ Status TopNNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 Status TopNNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     _tuple_pool.reset(new MemPool(mem_tracker()));
     RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), 
_row_descriptor));
     // AddExprCtxsToFree(_sort_exec_exprs);
@@ -77,7 +76,6 @@ Status TopNNode::prepare(RuntimeState* state) {
 Status TopNNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Top n, before open."));
     RETURN_IF_ERROR(_sort_exec_exprs.open(state));
@@ -129,7 +127,6 @@ Status TopNNode::open(RuntimeState* state) {
 
 Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) 
{
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to 
row_batch."));
 
diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp
index de5e7d7aec..a006fd53e7 100644
--- a/be/src/exec/union_node.cpp
+++ b/be/src/exec/union_node.cpp
@@ -69,7 +69,6 @@ Status UnionNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 Status UnionNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
     _materialize_exprs_evaluate_timer =
@@ -96,7 +95,6 @@ Status UnionNode::prepare(RuntimeState* state) {
 Status UnionNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // open const expr lists.
     for (const std::vector<ExprContext*>& exprs : _const_expr_lists) {
         RETURN_IF_ERROR(Expr::open(exprs, state));
@@ -234,7 +232,6 @@ Status UnionNode::get_next_const(RuntimeState* state, 
RowBatch* row_batch) {
 
 Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
 
     if (_to_close_child_idx != -1) {
diff --git a/be/src/http/default_path_handlers.cpp 
b/be/src/http/default_path_handlers.cpp
index 42a29d21e9..e0ddfbbb37 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -147,11 +147,9 @@ void mem_tracker_handler(const 
WebPageHandler::ArgumentMap& args, std::stringstr
                                                    
MemTrackerLimiter::Type::SCHEMA_CHANGE);
         } else if (iter->second == "clone") {
             MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::CLONE);
-        } else if (iter->second == "batch_load") {
-            MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::BATCHLOAD);
-        } else if (iter->second == "consistency") {
+        } else if (iter->second == "experimental") {
             MemTrackerLimiter::make_type_snapshots(&snapshots,
-                                                   
MemTrackerLimiter::Type::CONSISTENCY);
+                                                   
MemTrackerLimiter::Type::EXPERIMENTAL);
         }
     } else {
         (*output) << "<h4>*Note: (see documentation for details)</h4>\n";
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 0a05a03006..ab5d955089 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -292,11 +292,11 @@ void DeltaWriter::_reset_mem_table() {
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()),
-            nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
     auto mem_table_flush_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()),
-            nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 1f8465c0f3..adc57dfee7 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -56,8 +56,8 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, 
const TabletSchema* t
           _cur_max_version(cur_max_version) {
 #ifndef BE_TEST
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
-            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())), nullptr,
-            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())),
+            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
             fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())));
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 6e39b0f093..db1fac59bb 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -124,20 +124,14 @@ Status StorageEngine::start_bg_threads() {
             scoped_refptr<Thread> path_scan_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_scan_thread",
-                    [this, data_dir]() {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-                        this->_path_scan_thread_callback(data_dir);
-                    },
+                    [this, data_dir]() { 
this->_path_scan_thread_callback(data_dir); },
                     &path_scan_thread));
             _path_scan_threads.emplace_back(path_scan_thread);
 
             scoped_refptr<Thread> path_gc_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_gc_thread",
-                    [this, data_dir]() {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-                        this->_path_gc_thread_callback(data_dir);
-                    },
+                    [this, data_dir]() { 
this->_path_gc_thread_callback(data_dir); },
                     &path_gc_thread));
             _path_gc_threads.emplace_back(path_gc_thread);
         }
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 6a33fbb548..b174bbb6ea 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -819,8 +819,8 @@ Status PushBrokerReader::init(const Schema* schema, const 
TBrokerScanRange& t_sc
     }
     _runtime_profile = _runtime_state->runtime_profile();
     _runtime_profile->set_name("PushBrokerReader");
-    _mem_pool.reset(new MemPool(_runtime_state->scanner_mem_tracker().get()));
-    _tuple_buffer_pool.reset(new 
MemPool(_runtime_state->scanner_mem_tracker().get()));
+    _mem_pool.reset(new MemPool());
+    _tuple_buffer_pool.reset(new MemPool());
 
     _counter.reset(new ScannerCounter());
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index e73a5ba8c1..2ea8e20b7d 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -107,7 +107,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _effective_cluster_id(-1),
           _is_all_cluster_id_exist(true),
           _stopped(false),
-          _mem_tracker(std::make_shared<MemTracker>("StorageEngine")),
           
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
           
_segment_meta_mem_tracker(std::make_shared<MemTracker>("SegmentMeta")),
           _stop_background_threads_latch(1),
@@ -153,8 +152,7 @@ StorageEngine::~StorageEngine() {
 void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
     std::vector<std::thread> threads;
     for (auto data_dir : data_dirs) {
-        threads.emplace_back([this, data_dir] {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+        threads.emplace_back([data_dir] {
             auto res = data_dir->load();
             if (!res.ok()) {
                 LOG(WARNING) << "io error when init load tables. res=" << res
@@ -199,8 +197,7 @@ Status StorageEngine::_init_store_map() {
         DataDir* store = new DataDir(path.path, path.capacity_bytes, 
path.storage_medium,
                                      _tablet_manager.get(), 
_txn_manager.get());
         tmp_stores.emplace_back(store);
-        threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+        threads.emplace_back([store, &error_msg_lock, &error_msg]() {
             auto st = store->init();
             if (!st.ok()) {
                 {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 49ff072a59..62ff7817ee 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -326,8 +326,6 @@ private:
     // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we 
need custom hash func
     std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
 
-    // StorageEngine oneself
-    std::shared_ptr<MemTracker> _mem_tracker;
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
     // This mem tracker is only for tracking memory use by segment meta data 
such as footer or index page.
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index c877a8d0ec..1a5183b080 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -73,7 +73,9 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE
                                    mem_consumption, Labels({{"type", 
"tablet_meta"}}));
 
 TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
-        : _mem_tracker(std::make_shared<MemTracker>("TabletManager")),
+        : _mem_tracker(std::make_shared<MemTracker>(
+                  "TabletManager", 
ExecEnv::GetInstance()->experimental_mem_tracker())),
+          _tablet_meta_mem_tracker(std::make_shared<MemTracker>("TabletMeta")),
           _tablets_shards_size(tablet_map_lock_shard_size),
           _tablets_shards_mask(tablet_map_lock_shard_size - 1) {
     CHECK_GT(_tablets_shards_size, 0);
@@ -206,6 +208,10 @@ Status 
TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
     tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
     tablet_map[tablet_id] = tablet;
     _add_tablet_to_partition(tablet);
+    // TODO: remove multiply 2 of tablet meta mem size
+    // Because table schema will copy in tablet, there will be double mem cost
+    // so here multiply 2
+    _tablet_meta_mem_tracker->consume(tablet->tablet_meta()->mem_size() * 2);
 
     VLOG_NOTICE << "add tablet to map successfully."
                 << " tablet_id=" << tablet_id;
@@ -488,6 +494,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, TReplicaId repl
     }
 
     to_drop_tablet->deregister_tablet_from_dir();
+    
_tablet_meta_mem_tracker->release(to_drop_tablet->tablet_meta()->mem_size() * 
2);
     return Status::OK();
 }
 
@@ -718,6 +725,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
                                             TSchemaHash schema_hash, const 
string& meta_binary,
                                             bool update_meta, bool force, bool 
restore,
                                             bool check_path) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     TabletMetaSharedPtr tablet_meta(new TabletMeta());
     Status status = tablet_meta->deserialize(meta_binary);
     if (!status.ok()) {
@@ -800,6 +808,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
 Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
                                            SchemaHash schema_hash, const 
string& schema_hash_path,
                                            bool force, bool restore) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     LOG(INFO) << "begin to load tablet from dir. "
               << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
               << " path = " << schema_hash_path << " force = " << force << " 
restore = " << restore;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 1bd62381cf..75c29bdac7 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -202,6 +202,7 @@ private:
 
     // trace the memory use by meta of tablet
     std::shared_ptr<MemTracker> _mem_tracker;
+    std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
 
     const int32_t _tablets_shards_size;
     const int32_t _tablets_shards_mask;
diff --git a/be/src/olap/task/engine_batch_load_task.cpp 
b/be/src/olap/task/engine_batch_load_task.cpp
index 59a9cf16bc..121ee0891c 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -49,7 +49,7 @@ namespace doris {
 EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, 
std::vector<TTabletInfo>* tablet_infos)
         : _push_req(push_req), _tablet_infos(tablet_infos) {
     _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            MemTrackerLimiter::Type::BATCHLOAD,
+            MemTrackerLimiter::Type::LOAD,
             fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", 
_push_req.push_type,
                         std::to_string(_push_req.tablet_id)));
 }
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index 8bb6eb1f9e..ac23770c49 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -27,7 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, 
TSchemaHash schema_h
                                        TVersion version, uint32_t* checksum)
         : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), 
_checksum(checksum) {
     _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            MemTrackerLimiter::Type::CONSISTENCY,
+            MemTrackerLimiter::Type::LOAD,
             "EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
 }
 
diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index 59d46102b3..5ab627f317 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -455,8 +455,9 @@ DataStreamRecvr::DataStreamRecvr(
           _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
-    _mem_tracker = std::make_unique<MemTracker>(
-            "DataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
+    _mem_tracker =
+            std::make_unique<MemTracker>("DataStreamRecvr:" + 
print_id(_fragment_instance_id),
+                                         _profile, nullptr, "PeakMemoryUsage");
 
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_sender.cpp 
b/be/src/runtime/data_stream_sender.cpp
index df1495b750..19d9a90ad4 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -411,7 +411,8 @@ Status DataStreamSender::prepare(RuntimeState* state) {
     _profile = _pool->add(new RuntimeProfile(title.str()));
     SCOPED_TIMER(_profile->total_time_counter());
     _mem_tracker = std::make_unique<MemTracker>(
-            "DataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile);
+            "DataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile, nullptr,
+            "PeakMemoryUsage");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4bd9fd73af..a0a132cfc4 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -116,12 +116,10 @@ public:
         return nullptr;
     }
 
-    void set_orphan_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
orphan_tracker) {
-        _orphan_mem_tracker = orphan_tracker;
-        _orphan_mem_tracker_raw = orphan_tracker.get();
-    }
+    void init_mem_tracker();
     std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return 
_orphan_mem_tracker; }
     MemTrackerLimiter* orphan_mem_tracker_raw() { return 
_orphan_mem_tracker_raw; }
+    MemTrackerLimiter* experimental_mem_tracker() { return 
_experimental_mem_tracker.get(); }
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
     PriorityThreadPool* remote_scan_thread_pool() { return 
_remote_scan_thread_pool; }
@@ -208,6 +206,7 @@ private:
     // and the consumption of the orphan mem tracker is close to 0, but 
greater than 0.
     std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
     MemTrackerLimiter* _orphan_mem_tracker_raw;
+    std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker;
 
     // The following two thread pools are used in different scenarios.
     // _scan_thread_pool is a priority thread pool.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 735d8091ed..a9a1eac159 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -176,9 +176,7 @@ Status ExecEnv::_init_mem_env() {
     bool is_percent = false;
     std::stringstream ss;
     // 1. init mem tracker
-    _orphan_mem_tracker =
-            
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
-    _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+    init_mem_tracker();
     thread_context()->thread_mem_tracker_mgr->init();
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && 
!defined(ADDRESS_SANITIZER) && \
         !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && 
!defined(USE_JEMALLOC)
@@ -289,6 +287,14 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, 
int64_t capacity,
     _buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit);
 }
 
+void ExecEnv::init_mem_tracker() {
+    _orphan_mem_tracker =
+            
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan");
+    _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+    _experimental_mem_tracker = std::make_shared<MemTrackerLimiter>(
+            MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
+}
+
 void ExecEnv::init_download_cache_buf() {
     std::unique_ptr<char[]> download_cache_buf(new 
char[config::download_cache_buffer_size]);
     memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index b44fe23335..6470f56f70 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -76,9 +76,8 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
     // it's not quite helpfull to reduce memory pressure.
     // In this case we need to pick multiple load channels to reduce memory 
more effectively.
     _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
-    _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
-    _mem_tracker_set = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
-                                                           
"LoadChannelMgrTrackerSet");
+    _mem_tracker =
+            std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, 
"LoadChannelMgr");
     REGISTER_HOOK_METRIC(load_channel_mem_consumption,
                          [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -106,7 +105,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
             auto channel_mem_tracker = std::make_unique<MemTracker>(
                     fmt::format("LoadChannel#senderIp={}#loadID={}", 
params.sender_ip(),
                                 load_id.to_string()),
-                    nullptr, 
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker_set());
+                    ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
 #else
             auto channel_mem_tracker = 
std::make_unique<MemTracker>(fmt::format(
                     "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), 
load_id.to_string()));
@@ -345,6 +344,8 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
         if (_soft_reduce_mem_in_progress) {
             _soft_reduce_mem_in_progress = false;
         }
+        // refresh mem tacker to avoid duplicate reduce
+        _refresh_mem_tracker_without_lock();
     }
     return;
 }
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index beaaf29a91..7dc0c779d4 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -60,16 +60,10 @@ public:
     Status cancel(const PTabletWriterCancelRequest& request);
 
     void refresh_mem_tracker() {
-        int64_t mem_usage = 0;
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            for (auto& kv : _load_channels) {
-                mem_usage += kv.second->mem_consumption();
-            }
-        }
-        _mem_tracker->set_consumption(mem_usage);
+        std::lock_guard<std::mutex> l(_lock);
+        _refresh_mem_tracker_without_lock();
     }
-    MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); }
+    MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
 
 private:
     template <typename Request>
@@ -83,6 +77,16 @@ private:
 
     Status _start_bg_worker();
 
+    // lock should be held when calling this method
+    void _refresh_mem_tracker_without_lock() {
+        _mem_usage = 0;
+        for (auto& kv : _load_channels) {
+            _mem_usage += kv.second->mem_consumption();
+        }
+        THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - 
_mem_tracker->consumption(),
+                                       _mem_tracker.get());
+    }
+
 protected:
     // lock protect the load channel map
     std::mutex _lock;
@@ -91,9 +95,8 @@ protected:
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
-    std::unique_ptr<MemTracker> _mem_tracker;
-    // Associate load channel tracker and memtable tracker, avoid default 
association to Orphan tracker.
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
+    int64_t _mem_usage = 0;
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
     int64_t _load_hard_mem_limit = -1;
     int64_t _load_soft_mem_limit = -1;
     // By default, we try to reduce memory on the load channel with largest 
mem consumption,
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index 34baeb9ff3..08582aafc6 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -41,7 +41,8 @@ struct TrackerGroup {
 // Multiple groups are used to reduce the impact of locks.
 static std::vector<TrackerGroup> mem_tracker_pool(1000);
 
-MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent)
+MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
+                       const std::string& profile_counter_name)
         : _label(label) {
     if (profile == nullptr) {
         _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
@@ -57,7 +58,15 @@ MemTracker::MemTracker(const std::string& label, 
RuntimeProfile* profile, MemTra
         // release().
         _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, 
TUnit::BYTES);
     }
+    bind_parent(parent);
+}
+
+MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : 
_label(label) {
+    _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+    bind_parent(parent);
+}
 
+void MemTracker::bind_parent(MemTrackerLimiter* parent) {
     if (parent) {
         _parent_label = parent->label();
         _parent_group_num = parent->group_num();
@@ -96,7 +105,7 @@ void 
MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
                                      int64_t group_num, std::string 
parent_label) {
     std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
     for (auto tracker : mem_tracker_pool[group_num].trackers) {
-        if (tracker->parent_label() == parent_label) {
+        if (tracker->parent_label() == parent_label && tracker->consumption() 
!= 0) {
             snapshots->push_back(tracker->make_snapshot());
         }
     }
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index bf468043e1..0b4e54bb10 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -44,8 +44,9 @@ public:
     };
 
     // Creates and adds the tracker to the mem_tracker_pool.
-    MemTracker(const std::string& label, RuntimeProfile* profile = nullptr,
-               MemTrackerLimiter* parent = nullptr);
+    MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
+               const std::string& profile_counter_name);
+    MemTracker(const std::string& label, MemTrackerLimiter* parent = nullptr);
     // For MemTrackerLimiter
     MemTracker() { _parent_group_num = -1; }
 
@@ -59,6 +60,7 @@ public:
 public:
     const std::string& label() const { return _label; }
     const std::string& parent_label() const { return _parent_label; }
+    const std::string& set_parent_label() const { return _parent_label; }
     // Returns the memory consumed in bytes.
     int64_t consumption() const { return _consumption->current_value(); }
     int64_t peak_consumption() const { return _consumption->value(); }
@@ -88,6 +90,8 @@ public:
     static const std::string COUNTER_NAME;
 
 protected:
+    void bind_parent(MemTrackerLimiter* parent);
+
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
 
@@ -95,6 +99,7 @@ protected:
 
     // Tracker is located in group num in mem_tracker_pool
     int64_t _parent_group_num = 0;
+    // Use _parent_label to correlate with parent limiter tracker.
     std::string _parent_label = "-";
 
     // Iterator into mem_tracker_pool for this object. Stored to have O(1) 
remove.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index aea4b5f806..1478a5704f 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -24,6 +24,7 @@
 
 #include "gutil/once.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/load_channel_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
@@ -96,9 +97,8 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const 
{
 
 void MemTrackerLimiter::refresh_global_counter() {
     std::unordered_map<Type, int64_t> type_mem_sum = {
-            {Type::GLOBAL, 0},     {Type::QUERY, 0},         {Type::LOAD, 0},
-            {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0},
-            {Type::BATCHLOAD, 0},  {Type::CONSISTENCY, 0}};
+            {Type::GLOBAL, 0},        {Type::QUERY, 0}, {Type::LOAD, 0}, 
{Type::COMPACTION, 0},
+            {Type::SCHEMA_CHANGE, 0}, {Type::CLONE, 0}}; // No need refresh 
Type::EXPERIMENTAL
     for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
@@ -195,26 +195,35 @@ void MemTrackerLimiter::print_log_usage(const 
std::string& msg) {
     }
 }
 
+std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, 
bool with_stacktrace) {
+    std::string detail = msg;
+    detail += "\nProcess Memory Summary:\n    " + 
MemTrackerLimiter::process_mem_log_str();
+    if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace();
+    std::vector<MemTracker::Snapshot> snapshots;
+    MemTrackerLimiter::make_process_snapshots(&snapshots);
+    MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::GLOBAL);
+
+    // Add additional tracker printed when memory exceeds limit.
+    snapshots.emplace_back(
+            
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot());
+
+    detail += "\nMemory Tracker Summary:";
+    for (const auto& snapshot : snapshots) {
+        if (snapshot.label == "" && snapshot.parent_label == "") {
+            detail += "\n    " + MemTrackerLimiter::type_log_usage(snapshot);
+        } else if (snapshot.parent_label == "") {
+            detail += "\n    " + MemTrackerLimiter::log_usage(snapshot);
+        } else {
+            detail += "\n    " + MemTracker::log_usage(snapshot);
+        }
+    }
+    return detail;
+}
+
 void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool 
with_stacktrace) {
     if (MemTrackerLimiter::_enable_print_log_process_usage) {
         MemTrackerLimiter::_enable_print_log_process_usage = false;
-        std::string detail = msg;
-        detail += "\nProcess Memory Summary:\n    " + 
MemTrackerLimiter::process_mem_log_str();
-        if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + 
get_stack_trace();
-        std::vector<MemTracker::Snapshot> snapshots;
-        MemTrackerLimiter::make_process_snapshots(&snapshots);
-        MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::GLOBAL);
-        detail += "\nMemory Tracker Summary:";
-        for (const auto& snapshot : snapshots) {
-            if (snapshot.label == "" && snapshot.parent_label == "") {
-                detail += "\n    " + 
MemTrackerLimiter::type_log_usage(snapshot);
-            } else if (snapshot.parent_label == "") {
-                detail += "\n    " + MemTrackerLimiter::log_usage(snapshot);
-            } else {
-                detail += "\n    " + MemTracker::log_usage(snapshot);
-            }
-        }
-        LOG(WARNING) << detail;
+        LOG(WARNING) << log_process_usage_str(msg, with_stacktrace);
     }
 }
 
@@ -239,7 +248,7 @@ Status 
MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
     return Status::MemoryLimitExceeded(failed_msg);
 }
 
-int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) {
+int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type 
type) {
     std::priority_queue<std::pair<int64_t, std::string>,
                         std::vector<std::pair<int64_t, std::string>>,
                         std::greater<std::pair<int64_t, std::string>>>
@@ -252,16 +261,21 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem) {
         int64_t freed_mem = 0;
         while (!min_pq.empty()) {
             TUniqueId cancelled_queryid = 
label_to_queryid(min_pq.top().second);
+            if (cancelled_queryid == TUniqueId()) {
+                min_pq.pop();
+                continue;
+            }
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     cancelled_queryid, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
-                    fmt::format("Process has no memory available, cancel top 
memory usage query: "
-                                "query memory tracker <{}> consumption {}, 
backend {} "
+                    fmt::format("Process has no memory available, cancel top 
memory usage {}: "
+                                "{} memory tracker <{}> consumption {}, 
backend {} "
                                 "process memory used {} exceed limit {} or sys 
mem available {} "
                                 "less than low water mark {}. Execute again 
after enough memory, "
                                 "details see be.INFO.",
-                                min_pq.top().second, 
print_bytes(min_pq.top().first),
-                                BackendOptions::get_localhost(), 
PerfCounters::get_vm_rss_str(),
-                                MemInfo::mem_limit_str(), 
MemInfo::sys_mem_available_str(),
+                                TypeString[type], TypeString[type], 
min_pq.top().second,
+                                print_bytes(min_pq.top().first), 
BackendOptions::get_localhost(),
+                                PerfCounters::get_vm_rss_str(), 
MemInfo::mem_limit_str(),
+                                MemInfo::sys_mem_available_str(),
                                 
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
 
             freed_mem += min_pq.top().first;
@@ -270,7 +284,8 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem) {
             min_pq.pop();
         }
         if (!usage_strings.empty()) {
-            LOG(INFO) << "Process GC Free Top Memory Usage Query: " << 
join(usage_strings, ",");
+            LOG(INFO) << "Process GC Free Top Memory Usage " << 
TypeString[type] << ": "
+                      << join(usage_strings, ",");
         }
         return freed_mem;
     };
@@ -278,7 +293,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem) {
     for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
-            if (tracker->type() == Type::QUERY) {
+            if (tracker->type() == type) {
                 if (tracker->consumption() > min_free_mem) {
                     std::priority_queue<std::pair<int64_t, std::string>,
                                         std::vector<std::pair<int64_t, 
std::string>>,
@@ -304,7 +319,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem) {
     return cancel_top_query(min_pq);
 }
 
-int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
+int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, 
Type type) {
     std::priority_queue<std::pair<int64_t, std::string>,
                         std::vector<std::pair<int64_t, std::string>>,
                         std::greater<std::pair<int64_t, std::string>>>
@@ -314,7 +329,7 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
     for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
-            if (tracker->type() == Type::QUERY) {
+            if (tracker->type() == type) {
                 int64_t overcommit_ratio =
                         (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
                 if (overcommit_ratio == 0) { // Small query does not cancel
@@ -326,6 +341,11 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
         }
     }
 
+    // Minor gc does not cancel when there is only one query.
+    if (query_consumption.size() <= 1) {
+        return 0;
+    }
+
     std::priority_queue<std::pair<int64_t, std::string>> max_pq;
     // Min-heap to Max-heap.
     while (!min_pq.empty()) {
@@ -337,18 +357,23 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
     int64_t freed_mem = 0;
     while (!max_pq.empty()) {
         TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
+        if (cancelled_queryid == TUniqueId()) {
+            max_pq.pop();
+            continue;
+        }
         int64_t query_mem = query_consumption[max_pq.top().second];
         ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                 cancelled_queryid, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
-                fmt::format("Process has no memory available, cancel top 
memory usage query: "
-                            "query memory tracker <{}> consumption {}, backend 
{} "
-                            "process memory used {} exceed limit {} or sys mem 
available {} "
-                            "less than low water mark {}. Execute again after 
enough memory, "
+                fmt::format("Process has less memory, cancel top memory 
overcommit {}: "
+                            "{} memory tracker <{}> consumption {}, backend {} 
"
+                            "process memory used {} exceed soft limit {} or 
sys mem available {} "
+                            "less than warning water mark {}. Execute again 
after enough memory, "
                             "details see be.INFO.",
-                            max_pq.top().second, print_bytes(query_mem),
-                            BackendOptions::get_localhost(), 
PerfCounters::get_vm_rss_str(),
-                            MemInfo::mem_limit_str(), 
MemInfo::sys_mem_available_str(),
-                            
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+                            TypeString[type], TypeString[type], 
max_pq.top().second,
+                            print_bytes(query_mem), 
BackendOptions::get_localhost(),
+                            PerfCounters::get_vm_rss_str(), 
MemInfo::soft_mem_limit_str(),
+                            MemInfo::sys_mem_available_str(),
+                            
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
 
         usage_strings.push_back(fmt::format("{} memory usage {} Bytes, 
overcommit ratio: {}",
                                             max_pq.top().second, query_mem, 
max_pq.top().first));
@@ -359,7 +384,8 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
         max_pq.pop();
     }
     if (!usage_strings.empty()) {
-        LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << 
join(usage_strings, ",");
+        LOG(INFO) << "Process GC Free Top Memory Overcommit " << 
TypeString[type] << ": "
+                  << join(usage_strings, ",");
     }
     return freed_mem;
 }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 617a7ffdee..0b66a5c7fb 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -46,8 +46,8 @@ public:
         COMPACTION = 3,    // Count the memory consumption of all Base and 
Cumulative tasks.
         SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange 
tasks.
         CLONE = 5, // Count the memory consumption of all EngineCloneTask. 
Note: Memory that does not contain make/release snapshots.
-        BATCHLOAD = 6,  // Count the memory consumption of all 
EngineBatchLoadTask.
-        CONSISTENCY = 7 // Count the memory consumption of all 
EngineChecksumTask.
+        EXPERIMENTAL =
+                6 // Experimental memory statistics, usually inaccurate, used 
for debugging, and expect to add other types in the future.
     };
 
     inline static std::unordered_map<Type, 
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter>>
@@ -63,14 +63,11 @@ public:
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
                           {Type::CLONE,
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
-                          {Type::BATCHLOAD,
-                           
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)},
-                          {Type::CONSISTENCY,
+                          {Type::EXPERIMENTAL,
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
 
-    inline static const std::string TypeString[] = {"global",     "query",     
    "load",
-                                                    "compaction", 
"schema_change", "clone",
-                                                    "batch_load", 
"consistency"};
+    inline static const std::string TypeString[] = {
+            "global", "query", "load", "compaction", "schema_change", "clone", 
"experimental"};
 
 public:
     // byte_limit equal to -1 means no consumption limit, only participate in 
process memory statistics.
@@ -107,7 +104,8 @@ public:
     int64_t limit() const { return _limit; }
     bool limit_exceeded() const { return _limit >= 0 && _limit < 
consumption(); }
 
-    Status check_limit(int64_t bytes);
+    Status check_limit(int64_t bytes = 0);
+    bool is_overcommit_tracker() const { return type() == Type::QUERY || 
type() == Type::LOAD; }
 
     // Returns the maximum consumption that can be made without exceeding the 
limit on
     // this tracker limiter.
@@ -137,6 +135,7 @@ public:
     void print_log_usage(const std::string& msg);
     void enable_print_log_usage() { _enable_print_log_usage = true; }
     static void enable_print_log_process_usage() { 
_enable_print_log_process_usage = true; }
+    static std::string log_process_usage_str(const std::string& msg, bool 
with_stacktrace = true);
     static void print_log_process_usage(const std::string& msg, bool 
with_stacktrace = true);
 
     // Log the memory usage when memory limit is exceeded.
@@ -146,12 +145,21 @@ public:
                                        int64_t failed_allocation_size = 0);
 
     // Start canceling from the query with the largest memory usage until the 
memory of min_free_mem size is freed.
-    static int64_t free_top_memory_query(int64_t min_free_mem);
+    static int64_t free_top_memory_query(int64_t min_free_mem, Type type = 
Type::QUERY);
+    static int64_t free_top_memory_load(int64_t min_free_mem) {
+        return free_top_memory_query(min_free_mem, Type::LOAD);
+    }
     // Start canceling from the query with the largest memory overcommit ratio 
until the memory
     // of min_free_mem size is freed.
-    static int64_t free_top_overcommit_query(int64_t min_free_mem);
+    static int64_t free_top_overcommit_query(int64_t min_free_mem, Type type = 
Type::QUERY);
+    static int64_t free_top_overcommit_load(int64_t min_free_mem) {
+        return free_top_overcommit_query(min_free_mem, Type::LOAD);
+    }
     // only for Type::QUERY or Type::LOAD.
     static TUniqueId label_to_queryid(const std::string& label) {
+        if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 
0) {
+            return TUniqueId();
+        }
         auto queryid = split(label, "#Id=")[1];
         TUniqueId querytid;
         parse_id(queryid, &querytid);
@@ -160,12 +168,14 @@ public:
 
     static std::string process_mem_log_str() {
         return fmt::format(
-                "physical memory {}, process memory used {} limit {}, sys mem 
available {} low "
-                "water mark {}, refresh interval memory growth {} B",
+                "OS physical memory {}. Process memory usage {}, limit {}, 
soft limit {}. Sys "
+                "available memory {}, low water mark {}, warning water mark 
{}. Refresh interval "
+                "memory growth {} B",
                 PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
                 PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
-                MemInfo::sys_mem_available_str(),
+                MemInfo::soft_mem_limit_str(), 
MemInfo::sys_mem_available_str(),
                 
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), 
TUnit::BYTES),
                 MemInfo::refresh_interval_memory_growth);
     }
 
@@ -265,7 +275,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, 
std::string& failed_ms
         return false;
     }
 
-    if (_limit < 0 || (_type == Type::QUERY && 
config::enable_query_memroy_overcommit)) {
+    if (_limit < 0 || (is_overcommit_tracker() && 
config::enable_query_memroy_overcommit)) {
         _consumption->add(bytes); // No limit at this tracker.
     } else {
         if (!_consumption->try_add(bytes, _limit)) {
@@ -282,7 +292,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) 
{
     if (sys_mem_exceed_limit_check(bytes)) {
         return 
Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
     }
-    if (bytes <= 0 || (_type == Type::QUERY && 
config::enable_query_memroy_overcommit)) {
+    if (bytes <= 0 || (is_overcommit_tracker() && 
config::enable_query_memroy_overcommit)) {
         return Status::OK();
     }
     if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index b59cba4896..020b6ae9e4 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -102,7 +102,6 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     _runtime_state->set_tracer(std::move(tracer));
 
     SCOPED_ATTACH_TASK(_runtime_state.get());
-    _runtime_state->init_scanner_mem_trackers();
     _runtime_state->runtime_filter_mgr()->init();
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index ce851c119a..a40a2ecc02 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -45,7 +45,8 @@ RuntimeFilterMgr::~RuntimeFilterMgr() {}
 
 Status RuntimeFilterMgr::init() {
     DCHECK(_state->query_mem_tracker() != nullptr);
-    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr");
+    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
+                                            
ExecEnv::GetInstance()->experimental_mem_tracker());
     return Status::OK();
 }
 
@@ -161,7 +162,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
                                                 const TQueryOptions& 
query_options) {
     _query_id = query_id;
     _fragment_instance_id = fragment_instance_id;
-    _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity");
+    _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity",
+                                                
ExecEnv::GetInstance()->experimental_mem_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) 
{
         int filter_id = filterid_to_desc.first;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b18671a9b8..f61db045a8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -216,8 +216,6 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
 Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     _query_mem_tracker = std::make_shared<MemTrackerLimiter>(
             MemTrackerLimiter::Type::QUERY, fmt::format("TestQuery#Id={}", 
print_id(query_id)));
-    _scanner_mem_tracker =
-            std::make_shared<MemTracker>(fmt::format("TestScanner#QueryId={}", 
print_id(query_id)));
     return Status::OK();
 }
 
@@ -281,7 +279,8 @@ Status RuntimeState::set_mem_limit_exceeded(const 
std::string& msg) {
 Status RuntimeState::check_query_state(const std::string& msg) {
     // TODO: it would be nice if this also checked for cancellation, but doing 
so breaks
     // cases where we use Status::Cancelled("Cancelled") to indicate that the 
limit was reached.
-    if (thread_context()->thread_mem_tracker()->limit_exceeded()) {
+    if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
+        !config::enable_query_memroy_overcommit) {
         RETURN_LIMIT_EXCEEDED(this, msg);
     }
     return query_status();
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f68de8d714..ca6dc42c4e 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -76,11 +76,6 @@ public:
     Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& 
query_options,
                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
-    // after SCOPED_ATTACH_TASK;
-    void init_scanner_mem_trackers() {
-        _scanner_mem_tracker = std::make_shared<MemTracker>(
-                fmt::format("Scanner#QueryId={}", print_id(_query_id)));
-    }
     // for ut and non-query.
     Status init_mem_trackers(const TUniqueId& query_id = TUniqueId());
 
@@ -115,7 +110,6 @@ public:
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
     ExecEnv* exec_env() { return _exec_env; }
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return 
_query_mem_tracker; }
-    std::shared_ptr<MemTracker> scanner_mem_tracker() { return 
_scanner_mem_tracker; }
     ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
 
     void set_fragment_root_id(PlanNodeId id) {
@@ -418,8 +412,6 @@ private:
     static const int DEFAULT_BATCH_SIZE = 2048;
 
     std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
-    // Count the memory consumption of Scanner
-    std::shared_ptr<MemTracker> _scanner_mem_tracker;
 
     // put runtime state before _obj_pool, so that it will be deconstructed 
after
     // _obj_pool. Because some of object in _obj_pool will use profile when 
deconstructing.
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index cccf77c060..b7e68674e8 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -47,6 +47,7 @@ int64_t MemInfo::_s_physical_mem = -1;
 int64_t MemInfo::_s_mem_limit = -1;
 std::string MemInfo::_s_mem_limit_str = "";
 int64_t MemInfo::_s_soft_mem_limit = -1;
+std::string MemInfo::_s_soft_mem_limit_str = "";
 
 int64_t MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
@@ -89,44 +90,66 @@ void MemInfo::refresh_allocator_mem() {
 #endif
 }
 
-void MemInfo::process_minor_gc() {
+void MemInfo::process_cache_gc(int64_t& freed_mem) {
     // TODO, free more cache, and should free a certain percentage of 
capacity, not all.
+    freed_mem += ChunkAllocator::instance()->mem_consumption();
+    ChunkAllocator::instance()->clear();
+    freed_mem +=
+            
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+}
+
+// step1: free all cache
+// step2: free top overcommit query, if enable query memroy overcommit
+bool MemInfo::process_minor_gc() {
     int64_t freed_mem = 0;
     Defer defer {[&]() {
         LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", 
freed_mem);
     }};
 
-    freed_mem += ChunkAllocator::instance()->mem_consumption();
-    ChunkAllocator::instance()->clear();
+    MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_minor_gc_size) {
-        return;
+        return true;
     }
-    freed_mem +=
-            
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
-    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
     if (config::enable_query_memroy_overcommit) {
         freed_mem +=
                 
MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size - 
freed_mem);
     }
+    if (freed_mem > _s_process_minor_gc_size) {
+        return true;
+    }
+    return false;
 }
 
-void MemInfo::process_full_gc() {
+// step1: free all cache
+// step2: free top memory query
+// step3: free top overcommit load, load retries are more expensive, So cancel 
at the end.
+// step4: free top memory load
+bool MemInfo::process_full_gc() {
     int64_t freed_mem = 0;
     Defer defer {
             [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} 
Bytes", freed_mem); }};
 
-    freed_mem +=
-            
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
-    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_full_gc_size) {
-        return;
+        return true;
     }
-    freed_mem += ChunkAllocator::instance()->mem_consumption();
-    ChunkAllocator::instance()->clear();
+    freed_mem += 
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
     if (freed_mem > _s_process_full_gc_size) {
-        return;
+        return true;
     }
-    freed_mem += 
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
+    if (config::enable_query_memroy_overcommit) {
+        freed_mem +=
+                
MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size - 
freed_mem);
+        if (freed_mem > _s_process_full_gc_size) {
+            return true;
+        }
+    }
+    freed_mem += 
MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem);
+    if (freed_mem > _s_process_full_gc_size) {
+        return true;
+    }
+    return false;
 }
 
 #ifndef __APPLE__
@@ -186,6 +209,7 @@ void MemInfo::init() {
     }
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
     _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac;
+    _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, 
TUnit::BYTES);
 
     _s_process_minor_gc_size =
             ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, 
_s_mem_limit, &is_percent);
@@ -222,7 +246,7 @@ void MemInfo::init() {
             config::max_sys_mem_available_low_water_mark_bytes);
     int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 
0.01, 0);
     _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
-    _s_sys_mem_available_warning_water_mark = 
_s_sys_mem_available_low_water_mark + p1 * 2;
+    _s_sys_mem_available_warning_water_mark = 
_s_sys_mem_available_low_water_mark + p1;
 
     LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, 
TUnit::BYTES)
               << ", Mem Limit: " << _s_mem_limit_str
@@ -247,6 +271,7 @@ void MemInfo::init() {
     _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, 
_s_physical_mem, &is_percent);
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
     _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac;
+    _s_soft_mem_limit_str = PrettyPrinter::print(_s_soft_mem_limit, 
TUnit::BYTES);
 
     LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, 
TUnit::BYTES);
     _s_initialized = true;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index bd76c6124c..2cb17eb6b8 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -111,11 +111,16 @@ public:
         DCHECK(_s_initialized);
         return _s_soft_mem_limit;
     }
+    static inline std::string soft_mem_limit_str() {
+        DCHECK(_s_initialized);
+        return _s_soft_mem_limit_str;
+    }
 
     static std::string debug_string();
 
-    static void process_minor_gc();
-    static void process_full_gc();
+    static void process_cache_gc(int64_t& freed_mem);
+    static bool process_minor_gc();
+    static bool process_full_gc();
 
     // It is only used after the memory limit is exceeded. When multiple 
threads are waiting for the available memory of the process,
     // avoid multiple threads starting at the same time and causing OOM.
@@ -127,6 +132,7 @@ private:
     static int64_t _s_mem_limit;
     static std::string _s_mem_limit_str;
     static int64_t _s_soft_mem_limit;
+    static std::string _s_soft_mem_limit_str;
 
     static int64_t _s_allocator_cache_mem;
     static std::string _s_allocator_cache_mem_str;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 8236f61fb6..9f967a8190 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -348,7 +348,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status HashJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // Build phase
     _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, 
true);
     runtime_profile()->add_child(_build_phase_profile, false, nullptr);
@@ -635,7 +634,6 @@ Status HashJoinNode::open(RuntimeState* state) {
         RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state));
     }
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     return Status::OK();
 }
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 9befa4f4bb..528854537b 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -171,7 +171,6 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VJoinNodeBase::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::open");
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
 
     std::promise<Status> thread_status;
@@ -209,7 +208,6 @@ void VJoinNodeBase::_reset_tuple_is_null_column() {
 void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VJoinNodeBase::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(child(0)->open(state));
 }
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7c3f29c854..ce6ff568d2 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -104,7 +104,6 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", 
TUnit::UNIT);
@@ -206,7 +205,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_TIMER(_probe_timer);
     RETURN_IF_CANCELLED(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_is_output_left_side_only) {
         RETURN_IF_ERROR(get_left_side(state, &_left_block));
@@ -708,7 +706,6 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
         RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state));
     }
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     // We can close the right child to release its resources because its input 
has been
     // fully consumed.
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp 
b/be/src/vec/exec/scan/new_es_scan_node.cpp
index bd6f676fe5..0ca61e0652 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -76,7 +76,6 @@ Status NewEsScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status NewEsScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     if (_tuple_desc == nullptr) {
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index eaa511f40d..19fd93b9d3 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -37,7 +37,6 @@ std::string NewJdbcScanNode::get_name() {
 Status NewJdbcScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
index 1a7296a866..dbbf57a120 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -38,7 +38,6 @@ std::string NewOdbcScanNode::get_name() {
 Status NewOdbcScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index add047fc6a..c113a5e6f2 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -45,7 +45,6 @@ Status 
NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
 
 Status NewOlapScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(VScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 63c8fa1355..c645cbc62d 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -191,7 +191,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, 
ScannerContext* ctx,
                                      VScanner* scanner) {
     SCOPED_ATTACH_TASK(scanner->runtime_state());
-    
SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker());
     Thread::set_self_name("_scanner_scan");
     scanner->update_wait_worker_timer();
     scanner->start_scan_cpu_timer();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index e0ba630f67..57178d4d92 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -71,8 +71,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 
 Status VScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-
     RETURN_IF_ERROR(_init_profile());
 
     // init profile for runtime filter
@@ -89,9 +87,7 @@ Status VScanNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
-    RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
     if (_eos) {
         return Status::OK();
@@ -112,7 +108,6 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VScanNode::get_next");
     SCOPED_TIMER(_get_next_timer);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     if (state->is_cancelled()) {
         _scanner_ctx->set_status_on_error(Status::Cancelled("query 
cancelled"));
         return _scanner_ctx->status();
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index e93fc89591..277f1f6691 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -292,7 +292,6 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
 Status AggregationNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
     _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
@@ -449,7 +448,6 @@ Status AggregationNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
 
@@ -493,7 +491,6 @@ Status AggregationNode::get_next(RuntimeState* state, 
RowBatch* row_batch, bool*
 Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) 
{
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"AggregationNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_is_streaming_preagg) {
         RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 55d98d47ab..64b6a2e7cb 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -149,7 +149,6 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VAnalyticEvalNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor));
     _mem_pool.reset(new MemPool(mem_tracker()));
     _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
@@ -217,7 +216,6 @@ Status VAnalyticEvalNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(child(0)->open(state));
     RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state));
@@ -255,7 +253,6 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, 
vectorized::Block* block
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
                                  "VAnalyticEvalNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
 
     if (_input_eos && _output_block_index == _input_blocks.size()) {
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp 
b/be/src/vec/exec/vbroker_scan_node.cpp
index f164b5a93d..dfa82d893c 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -59,7 +59,6 @@ Status VBrokerScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status VBrokerScanNode::prepare(RuntimeState* state) {
     VLOG_QUERY << "VBrokerScanNode prepare";
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // get tuple desc
     _runtime_state = state;
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -85,7 +84,6 @@ Status VBrokerScanNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VBrokerScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
 
     RETURN_IF_ERROR(start_scanners());
@@ -109,7 +107,6 @@ Status VBrokerScanNode::start_scanners() {
 Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VBrokerScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // check if CANCELLED.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -273,7 +270,6 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, 
"VBrokerScanNode::scanner_worker");
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index c28ca147f8..f595c6fea3 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -50,7 +50,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
 Status VExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     DCHECK_GT(_num_senders, 0);
     _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
     _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
@@ -67,7 +66,6 @@ Status VExchangeNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExchangeNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_is_merging) {
         RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
@@ -85,7 +83,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* 
row_batch, bool* e
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VExchangeNode::get_next");
     SCOPED_TIMER(runtime_profile()->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
         if (!_is_merging) {
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp 
b/be/src/vec/exec/vmysql_scan_node.cpp
index d8148d4b9d..fb350fb477 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -50,7 +50,6 @@ Status VMysqlScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // get tuple desc
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
@@ -104,7 +103,6 @@ Status VMysqlScanNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VMysqlScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     VLOG_CRITICAL << "MysqlScanNode::Open";
 
     if (!_is_init) {
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index c0b1491c50..31d3fc60ef 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -123,7 +123,6 @@ Status VSchemaScanNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_scanner_param.user) {
         TSetSessionParams param;
@@ -146,7 +145,6 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSchemaScanNode::prepare");
     RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     // new one mem pool
     _tuple_pool.reset(new (std::nothrow) MemPool());
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 1324c5f7d5..8ce115813c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -123,7 +123,6 @@ Status VSetOperationNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     // open result expr lists.
     for (const std::vector<VExprContext*>& exprs : _child_expr_lists) {
         RETURN_IF_ERROR(VExpr::open(exprs, state));
@@ -135,7 +134,6 @@ Status VSetOperationNode::open(RuntimeState* state) {
 Status VSetOperationNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
 
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 37e120b015..f2462efae4 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -68,7 +68,6 @@ Status VSortNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : 
"true");
     RETURN_IF_ERROR(ExecNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), 
_row_descriptor));
     return Status::OK();
 }
@@ -77,7 +76,6 @@ Status VSortNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSortNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("vsort, while open."));
@@ -113,7 +111,6 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* 
row_batch, bool* eos)
 Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VSortNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     RETURN_IF_ERROR(_sorter->get_next(state, block, eos));
     reached_limit(block, eos);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 4c6c3a57bc..c8c2f5ecd4 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -270,8 +270,9 @@ VDataStreamRecvr::VDataStreamRecvr(
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
-    _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
+    _mem_tracker =
+            std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id),
+                                         _profile, nullptr, "PeakMemoryUsage");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     // Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9ab836252d..0bf10c4fd2 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -425,7 +425,8 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _profile = _pool->add(new RuntimeProfile(title));
     SCOPED_TIMER(_profile->total_time_counter());
     _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile);
+            "VDataStreamSender:" + print_id(state->fragment_instance_id()), 
_profile, nullptr,
+            "PeakMemoryUsage");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM) {
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index 99a338a5aa..cd6499831f 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -28,10 +28,7 @@
 #include "util/mem_info.h"
 
 int main(int argc, char** argv) {
-    std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker =
-            
std::make_shared<doris::MemTrackerLimiter>(doris::MemTrackerLimiter::Type::GLOBAL,
-                                                       "Orphan");
-    doris::ExecEnv::GetInstance()->set_orphan_mem_tracker(orphan_mem_tracker);
+    doris::ExecEnv::GetInstance()->init_mem_tracker();
     doris::thread_context()->thread_mem_tracker_mgr->init();
     doris::TabletSchemaCache::create_global_schema_cache();
     doris::StoragePageCache::create_global_cache(1 << 30, 10);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to