This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 32a029d9dc [enhancement](memtracker) Refactor load channel + memtable 
mem tracker (#13795)
32a029d9dc is described below

commit 32a029d9dcd18828ecb9f7da261abe8de10a7bae
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Nov 3 09:47:12 2022 +0800

    [enhancement](memtracker) Refactor load channel + memtable mem tracker 
(#13795)
---
 be/src/common/config.h                          | 12 +-----
 be/src/exec/blocking_join_node.cpp              |  2 +-
 be/src/exec/broker_scan_node.cpp                |  2 +-
 be/src/exec/es_http_scan_node.cpp               |  2 +-
 be/src/exec/except_node.cpp                     |  2 +-
 be/src/exec/exec_node.cpp                       |  2 +-
 be/src/exec/exec_node.h                         |  3 +-
 be/src/exec/hash_join_node.cpp                  |  2 +-
 be/src/exec/intersect_node.cpp                  |  2 +-
 be/src/exec/olap_scan_node.cpp                  |  9 ++---
 be/src/exec/olap_scan_node.h                    |  2 +-
 be/src/exec/olap_scanner.cpp                    |  8 ++--
 be/src/exec/olap_scanner.h                      |  5 ++-
 be/src/olap/delta_writer.cpp                    | 53 +++++++++++++++----------
 be/src/olap/delta_writer.h                      | 24 +++++------
 be/src/olap/memtable.cpp                        | 38 +++++++++---------
 be/src/olap/memtable.h                          | 25 +++++++-----
 be/src/olap/tablet_manager.cpp                  |  2 -
 be/src/runtime/load_channel.cpp                 | 12 +++---
 be/src/runtime/load_channel.h                   | 29 ++++++++++----
 be/src/runtime/load_channel_mgr.cpp             | 21 ++++------
 be/src/runtime/load_channel_mgr.h               | 32 ++++++++++-----
 be/src/runtime/memory/mem_tracker.h             |  1 +
 be/src/runtime/memory/mem_tracker_limiter.h     |  1 +
 be/src/runtime/memory/mem_tracker_task_pool.cpp |  2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h  |  1 -
 be/src/runtime/runtime_filter_mgr.cpp           |  4 +-
 be/src/runtime/runtime_filter_mgr.h             |  2 +-
 be/src/runtime/tablets_channel.cpp              | 26 ++++++++----
 be/src/runtime/tablets_channel.h                | 11 ++---
 be/src/runtime/thread_context.cpp               | 16 ++++----
 be/src/runtime/thread_context.h                 |  8 ++++
 be/src/service/doris_main.cpp                   |  4 +-
 be/src/vec/exec/join/vhash_join_node.cpp        |  2 +-
 be/src/vec/exec/vblocking_join_node.cpp         |  2 +-
 be/src/vec/exec/vbroker_scan_node.cpp           |  2 +-
 be/src/vec/exec/ves_http_scan_node.cpp          |  2 +-
 be/src/vec/exec/volap_scan_node.cpp             |  2 +-
 be/src/vec/exec/vschema_scan_node.cpp           |  2 +-
 be/test/olap/delta_writer_test.cpp              |  6 +--
 40 files changed, 215 insertions(+), 168 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index cf04e8d3d3..2d627682bb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -631,16 +631,8 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
 // Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker 
in Hook.
 CONF_Bool(enable_tcmalloc_hook, "true");
 
-// If true, switch TLS MemTracker to count more detailed memory,
-// including caches such as ExecNode operators and TabletManager.
-//
-// At present, there is a performance problem in the frequent switch thread 
mem tracker.
-// This is because the mem tracker exists as a shared_ptr in the thread local. 
Each time it is switched,
-// the atomic variable use_count in the shared_ptr of the current tracker will 
be -1, and the tracker to be
-// replaced use_count +1, multi-threading Frequent changes to the same tracker 
shared_ptr are slow.
-// TODO: 1. Reduce unnecessary thread mem tracker switches,
-//       2. Consider using raw pointers for mem tracker in thread local
-CONF_Bool(memory_verbose_track, "false");
+// Print more detailed logs, more detailed records, etc.
+CONF_Bool(memory_debug, "false");
 
 // The minimum length when TCMalloc Hook consumes/releases MemTracker, consume 
size
 // smaller than this value will continue to accumulate. specified as number of 
bytes.
diff --git a/be/src/exec/blocking_join_node.cpp 
b/be/src/exec/blocking_join_node.cpp
index 5e5aee3714..0d366ab1d4 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -93,7 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) {
 
 void BlockingJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(construct_build_side(state));
 }
 
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 9cff2da4ff..a281d29c6e 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -376,7 +376,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& 
scan_range,
 
 void BrokerScanNode::scanner_worker(int start_idx, int length) {
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, 
&scanner_expr_ctxs);
diff --git a/be/src/exec/es_http_scan_node.cpp 
b/be/src/exec/es_http_scan_node.cpp
index 28205290cc..38de757f87 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -424,7 +424,7 @@ static std::string get_host_port(const 
std::vector<TNetworkAddress>& es_hosts) {
 
 void EsHttpScanNode::scanner_worker(int start_idx, int length, 
std::promise<Status>& p_status) {
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     DCHECK(start_idx < length);
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8ad7ce9044..f12f6ca18a 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -39,8 +39,8 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
 }
 
 Status ExceptNode::open(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index be94dcbb28..fe0d6d1fe1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
             std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
                                runtime_profile()->total_time_counter()),
             "");
-    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
+    _mem_tracker = std::make_shared<MemTracker>("ExecNode:" + 
_runtime_profile->name(),
                                                 _runtime_profile.get());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 20e0785d94..296726b924 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -194,6 +194,7 @@ public:
     RuntimeProfile::Counter* memory_used_counter() const { return 
_memory_used_counter; }
 
     MemTracker* mem_tracker() const { return _mem_tracker.get(); }
+    std::shared_ptr<MemTracker> mem_tracker_shared() const { return 
_mem_tracker; }
 
     OpentelemetrySpan get_next_span() { return _get_next_span; }
 
@@ -298,7 +299,7 @@ protected:
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
     /// Account for peak memory used by this node
-    std::unique_ptr<MemTracker> _mem_tracker;
+    std::shared_ptr<MemTracker> _mem_tracker;
 
     RuntimeProfile::Counter* _rows_returned_counter;
     RuntimeProfile::Counter* _rows_returned_rate;
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 7e9a2bf989..592c93ee7e 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -184,7 +184,7 @@ Status HashJoinNode::close(RuntimeState* state) {
 
 void HashJoinNode::probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(child(0)->open(state));
 }
 
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 9bf02cedd4..d04ddc5061 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -43,8 +43,8 @@ Status IntersectNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 // 2 probe with child(1), then filter the hash table and find the matched 
item, use them to rebuild a hash table
 // repeat [2] this for all the rest child
 Status IntersectNode::open(RuntimeState* state) {
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index ef4f2236c5..1dcd5c3754 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -191,7 +191,7 @@ Status OlapScanNode::prepare(RuntimeState* state) {
     _init_counter(state);
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
-    _scanner_mem_tracker = std::make_unique<MemTracker>("Scanners");
+    _scanner_mem_tracker = std::make_shared<MemTracker>("Scanners");
 
     if (_tuple_desc == nullptr) {
         // TODO: make sure we print all available diagnostic output to our 
error log
@@ -941,7 +941,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) 
{
             }
             OlapScanner* scanner =
                     new OlapScanner(state, this, 
_olap_scan_node.is_preaggregation,
-                                    _need_agg_finalize, *scan_range, 
_scanner_mem_tracker.get());
+                                    _need_agg_finalize, *scan_range, 
_scanner_mem_tracker);
             scanner->set_batch_size(_batch_size);
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare 
failed.
@@ -1483,7 +1483,7 @@ Status 
OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
 void OlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Status status = Status::OK();
     for (auto scanner : _olap_scanners) {
         status = Expr::clone_if_not_exists(_conjunct_ctxs, state, 
scanner->conjunct_ctxs());
@@ -1660,8 +1660,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 }
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
-    // SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Thread::set_self_name("olap_scanner");
     if (UNLIKELY(_transfer_done)) {
         _scanner_done = true;
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index f5d6e38cfd..47cf6cd01c 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -264,7 +264,7 @@ protected:
 
     int64_t _buffered_bytes;
     // Count the memory consumption of Rowset Reader and Tablet Reader in 
OlapScanner.
-    std::unique_ptr<MemTracker> _scanner_mem_tracker;
+    std::shared_ptr<MemTracker> _scanner_mem_tracker;
     EvalConjunctsFn _eval_conjuncts_fn;
 
     // the max num of scan keys of this scan request.
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 6f348d02a1..e90bc2124c 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -43,7 +43,7 @@ namespace doris {
 
 OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, 
bool aggregation,
                          bool need_agg_finalize, const TPaloScanRange& 
scan_range,
-                         MemTracker* tracker)
+                         const std::shared_ptr<MemTracker>& tracker)
         : _runtime_state(runtime_state),
           _parent(parent),
           _tuple_desc(parent->_tuple_desc),
@@ -51,8 +51,8 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, 
OlapScanNode* parent, bool
           _is_open(false),
           _aggregation(aggregation),
           _need_agg_finalize(need_agg_finalize),
-          _version(-1) {
-    _mem_tracker = tracker;
+          _version(-1),
+          _mem_tracker(tracker) {
     _tablet_schema = std::make_shared<TabletSchema>();
 }
 
@@ -326,7 +326,7 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
     bzero(tuple_buf, _batch_size * _tuple_desc->byte_size());
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);
 
-    std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker));
+    std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
     int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     {
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 134a515219..961f80c9d0 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -42,7 +42,8 @@ class OlapScanNode;
 class OlapScanner {
 public:
     OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool 
aggregation,
-                bool need_agg_finalize, const TPaloScanRange& scan_range, 
MemTracker* tracker);
+                bool need_agg_finalize, const TPaloScanRange& scan_range,
+                const std::shared_ptr<MemTracker>& tracker);
 
     virtual ~OlapScanner() = default;
 
@@ -151,7 +152,7 @@ protected:
 
     MonotonicStopWatch _watcher;
 
-    MemTracker* _mem_tracker;
+    std::shared_ptr<MemTracker> _mem_tracker;
 
     TabletSchemaSPtr _tablet_schema;
 };
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index c8a36fbec7..39185cdbac 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -33,14 +33,14 @@
 
 namespace doris {
 
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
-                         const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker, bool is_vec) {
-    *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, 
is_vec);
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, const 
UniqueId& load_id,
+                         bool is_vec) {
+    *writer = new DeltaWriter(req, StorageEngine::instance(), load_id, is_vec);
     return Status::OK();
 }
 
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
-                         const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker, bool is_vec)
+DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
const UniqueId& load_id,
+                         bool is_vec)
         : _req(*req),
           _tablet(nullptr),
           _cur_rowset(nullptr),
@@ -48,7 +48,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* 
storage_engine,
           _tablet_schema(new TabletSchema),
           _delta_written_success(false),
           _storage_engine(storage_engine),
-          _parent_tracker(parent_tracker),
+          _load_id(load_id),
           _is_vec(is_vec) {}
 
 DeltaWriter::~DeltaWriter() {
@@ -109,8 +109,6 @@ Status DeltaWriter::init() {
         _rowset_ids = _tablet->all_rs_id(_cur_max_version);
     }
 
-    _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), 
_parent_tracker);
     // check tablet version number
     if (_tablet->version_count() > config::max_tablet_version_num) {
         //trigger quick compaction
@@ -164,8 +162,6 @@ Status DeltaWriter::write(Tuple* tuple) {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
-
     _mem_table->insert(tuple);
 
     // if memtable is full, push it to the flush executor,
@@ -281,9 +277,21 @@ void DeltaWriter::_reset_mem_table() {
     if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == 
nullptr) {
         _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
     }
+    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
+            
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
+                        std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()));
+    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
+            
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
+                        std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()));
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        _mem_table_tracker.push_back(mem_table_insert_tracker);
+        _mem_table_tracker.push_back(mem_table_flush_tracker);
+    }
     _mem_table.reset(new MemTable(_tablet, _schema.get(), 
_tablet_schema.get(), _req.slots,
                                   _req.tuple_desc, _rowset_writer.get(), 
_delete_bitmap,
-                                  _rowset_ids, _cur_max_version, _mem_tracker, 
_is_vec));
+                                  _rowset_ids, _cur_max_version, 
mem_table_insert_tracker,
+                                  mem_table_flush_tracker, _is_vec));
 }
 
 Status DeltaWriter::close() {
@@ -384,8 +392,9 @@ Status DeltaWriter::cancel() {
 }
 
 void DeltaWriter::save_mem_consumption_snapshot() {
+    std::lock_guard<std::mutex> l(_lock);
     _mem_consumption_snapshot = mem_consumption();
-    _memtable_consumption_snapshot = memtable_consumption();
+    _memtable_consumption_snapshot = _mem_table->memory_usage();
 }
 
 int64_t DeltaWriter::get_memtable_consumption_inflush() const {
@@ -397,20 +406,20 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot() 
const {
     return _memtable_consumption_snapshot;
 }
 
-int64_t DeltaWriter::mem_consumption() const {
-    if (_mem_tracker == nullptr) {
+int64_t DeltaWriter::mem_consumption() {
+    if (_flush_token == nullptr) {
         // This method may be called before this writer is initialized.
-        // So _mem_tracker may be null.
+        // So _flush_token may be null.
         return 0;
     }
-    return _mem_tracker->consumption();
-}
-
-int64_t DeltaWriter::memtable_consumption() const {
-    if (_mem_table == nullptr) {
-        return 0;
+    int64_t mem_usage = 0;
+    {
+        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+        for (auto mem_table_tracker : _mem_table_tracker) {
+            mem_usage += mem_table_tracker->consumption();
+        }
     }
-    return _mem_table->mem_tracker_hook()->consumption();
+    return mem_usage;
 }
 
 int64_t DeltaWriter::partition_id() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index a7770326fd..85133e456c 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -20,6 +20,7 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/tablet.h"
+#include "util/spinlock.h"
 
 namespace doris {
 
@@ -56,9 +57,7 @@ struct WriteRequest {
 class DeltaWriter {
 public:
     static Status open(WriteRequest* req, DeltaWriter** writer,
-                       const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker =
-                               std::shared_ptr<MemTrackerLimiter>(),
-                       bool is_vec = false);
+                       const UniqueId& load_id = TUniqueId(), bool is_vec = 
false);
 
     ~DeltaWriter();
 
@@ -93,7 +92,7 @@ public:
 
     int64_t partition_id() const;
 
-    int64_t mem_consumption() const;
+    int64_t mem_consumption();
 
     // Wait all memtable in flush queue to be flushed
     Status wait_flush();
@@ -102,8 +101,6 @@ public:
 
     int32_t schema_hash() { return _tablet->schema_hash(); }
 
-    int64_t memtable_consumption() const;
-
     void save_mem_consumption_snapshot();
 
     int64_t get_memtable_consumption_inflush() const;
@@ -113,8 +110,8 @@ public:
     void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
 private:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
-                const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool 
is_vec);
+    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const 
UniqueId& load_id,
+                bool is_vec);
 
     // push a full memtable to flush executor
     Status _flush_memtable_async();
@@ -146,14 +143,11 @@ private:
     bool _delta_written_success;
 
     StorageEngine* _storage_engine;
+    UniqueId _load_id;
     std::unique_ptr<FlushToken> _flush_token;
-    // The memory value automatically tracked by the Tcmalloc hook is 20% less 
than the manually recorded
-    // value in the memtable, because some freed memory is not allocated in 
the DeltaWriter.
-    // The memory value automatically tracked by the Tcmalloc hook, used for 
load channel mgr to trigger
-    // flush memtable when the sum of all channel memory exceeds the limit.
-    // The manually recorded value of memtable is used to flush when it is 
larger than write_buffer_size.
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
-    std::shared_ptr<MemTrackerLimiter> _parent_tracker;
+    std::vector<std::shared_ptr<MemTracker>> _mem_table_tracker;
+    SpinLock _mem_table_tracker_lock;
+    std::atomic<uint32_t> _mem_table_num = 1;
 
     // The counter of number of segment flushed already.
     int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5eed903d29..f61e945bda 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -35,14 +35,14 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, 
const TabletSchema* t
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
                    RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap,
                    const RowsetIdUnorderedSet& rowset_ids, int64_t 
cur_max_version,
-                   const std::shared_ptr<MemTrackerLimiter>& tracker, bool 
support_vec)
+                   const std::shared_ptr<MemTracker>& insert_mem_tracker,
+                   const std::shared_ptr<MemTracker>& flush_mem_tracker, bool 
support_vec)
         : _tablet(std::move(tablet)),
           _schema(schema),
           _tablet_schema(tablet_schema),
           _slot_descs(slot_descs),
-          _mem_tracker_hook(std::make_shared<MemTrackerLimiter>(
-                  -1, fmt::format("MemTableHook:tabletId={}", 
std::to_string(tablet_id())),
-                  tracker)),
+          _insert_mem_tracker(insert_mem_tracker),
+          _flush_mem_tracker(flush_mem_tracker),
           _schema_size(_schema->schema_size()),
           _rowset_writer(rowset_writer),
           _is_first_insertion(true),
@@ -53,12 +53,10 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, 
const TabletSchema* t
           _delete_bitmap(delete_bitmap),
           _rowset_ids(rowset_ids),
           _cur_max_version(cur_max_version) {
-    _mem_tracker_hook->enable_reset_zero();
-    SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
-    _mem_tracker_manual = std::make_unique<MemTracker>(
-            fmt::format("MemTableManual:tabletId={}", 
std::to_string(tablet_id())));
-    _buffer_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get());
-    _table_mem_pool = std::make_unique<MemPool>(_mem_tracker_manual.get());
+    _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
+            fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id())));
+    _buffer_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
+    _table_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
     if (support_vec) {
         _skip_list = nullptr;
         _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
@@ -153,12 +151,14 @@ MemTable::~MemTable() {
         }
     }
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), 
std::default_delete<RowInBlock>());
-    _mem_tracker_manual->release(_mem_usage);
+    _insert_mem_tracker->release(_mem_usage);
     _buffer_mem_pool->free_all();
     _table_mem_pool->free_all();
-    DCHECK_EQ(_mem_tracker_manual->consumption(), 0)
+    _flush_mem_tracker->set_consumption(0);
+    DCHECK_EQ(_insert_mem_tracker->consumption(), 0)
             << std::endl
-            << MemTracker::log_usage(_mem_tracker_manual->make_snapshot(0));
+            << MemTracker::log_usage(_insert_mem_tracker->make_snapshot(0));
+    DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
 }
 
 MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : 
_schema(schema) {}
@@ -176,7 +176,7 @@ int MemTable::RowInBlockComparator::operator()(const 
RowInBlock* left,
 }
 
 void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<int>& row_idxs) {
-    SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+    SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
     auto target_block = input_block->copy_block(_column_offset);
     if (_is_first_insertion) {
         _is_first_insertion = false;
@@ -193,8 +193,7 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<in
     _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
     size_t input_size = target_block.allocated_bytes() * num_rows / 
target_block.rows();
     _mem_usage += input_size;
-    _mem_tracker_manual->consume(input_size);
-
+    _insert_mem_tracker->consume(input_size);
     for (int i = 0; i < num_rows; i++) {
         _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
         _insert_one_row_from_block(_row_in_blocks.back());
@@ -374,7 +373,8 @@ void MemTable::_collect_vskiplist_results() {
         if constexpr (!is_final) {
             // if is not final, we collect the agg results to input_block and 
then continue to insert
             size_t shrunked_after_agg = 
_output_mutable_block.allocated_bytes();
-            _mem_tracker_manual->consume(shrunked_after_agg - _mem_usage);
+            // flush will not run here, so will not duplicate 
`_flush_mem_tracker`
+            _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
             _mem_usage = shrunked_after_agg;
             _input_mutable_block.swap(_output_mutable_block);
             //TODO(weixang):opt here.
@@ -392,7 +392,7 @@ void MemTable::_collect_vskiplist_results() {
 }
 
 void MemTable::shrink_memtable_by_agg() {
-    SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+    SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
     if (keys_type() == KeysType::DUP_KEYS) {
         return;
     }
@@ -434,7 +434,7 @@ Status MemTable::_generate_delete_bitmap() {
 }
 
 Status MemTable::flush() {
-    SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
+    SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker.get());
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9d837b5560..52589f35cc 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -45,18 +45,17 @@ public:
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
              RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap,
              const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version,
-             const std::shared_ptr<MemTrackerLimiter>& tracker, bool 
support_vec = false);
+             const std::shared_ptr<MemTracker>& insert_mem_tracker,
+             const std::shared_ptr<MemTracker>& flush_mem_tracker, bool 
support_vec = false);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet->tablet_id(); }
     KeysType keys_type() const { return _tablet->keys_type(); }
-    std::shared_ptr<MemTrackerLimiter> mem_tracker_hook() const { return 
_mem_tracker_hook; }
-    size_t memory_usage() const { return _mem_tracker_manual->consumption(); }
-
-    inline void insert(const Tuple* tuple) {
-        SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD);
-        (this->*_insert_fn)(tuple);
+    size_t memory_usage() const {
+        return _insert_mem_tracker->consumption() + 
_flush_mem_tracker->consumption();
     }
+
+    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
     void insert(const vectorized::Block* block, const std::vector<int>& 
row_idxs);
 
@@ -161,8 +160,16 @@ private:
 
     std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
 
-    std::unique_ptr<MemTracker> _mem_tracker_manual;
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker_hook;
+    // `_insert_manual_mem_tracker` manually records the memory value of 
memtable insert()
+    // `_flush_hook_mem_tracker` automatically records the memory value of 
memtable flush() through mem hook.
+    // Is used to flush when _insert_manual_mem_tracker larger than 
write_buffer_size and run flush memtable
+    // when the sum of all memtable (_insert_manual_mem_tracker + 
_flush_hook_mem_tracker) exceeds the limit.
+    std::shared_ptr<MemTracker> _insert_mem_tracker;
+    std::shared_ptr<MemTracker> _flush_mem_tracker;
+    // It is only used for verification when the value of 
`_insert_manual_mem_tracker` is suspected to be wrong.
+    // The memory value automatically tracked by the mem hook is 20% less than 
the manually recorded
+    // value in the memtable, because some freed memory is not allocated in 
the DeltaWriter.
+    std::unique_ptr<MemTracker> _insert_mem_tracker_use_hook;
     // This is a buffer, to hold the memory referenced by the rows that have 
not
     // been inserted into the SkipList
     std::unique_ptr<MemPool> _buffer_mem_pool;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 7cc81b782e..7580b1aed3 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -722,7 +722,6 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
                                             TSchemaHash schema_hash, const 
string& meta_binary,
                                             bool update_meta, bool force, bool 
restore,
                                             bool check_path) {
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     TabletMetaSharedPtr tablet_meta(new TabletMeta());
     Status status = tablet_meta->deserialize(meta_binary);
     if (!status.ok()) {
@@ -805,7 +804,6 @@ Status TabletManager::load_tablet_from_meta(DataDir* 
data_dir, TTabletId tablet_
 Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
                                            SchemaHash schema_hash, const 
string& schema_hash_path,
                                            bool force, bool restore) {
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     LOG(INFO) << "begin to load tablet from dir. "
               << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
               << " path = " << schema_hash_path << " force = " << force << " 
restore = " << restore;
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index c989f11336..ba15c6c7b4 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,11 @@
 
 namespace doris {
 
-LoadChannel::LoadChannel(const UniqueId& load_id, 
std::shared_ptr<MemTrackerLimiter>& mem_tracker,
+LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> 
mem_tracker,
                          int64_t timeout_s, bool is_high_priority, const 
std::string& sender_ip,
                          bool is_vec)
         : _load_id(load_id),
-          _mem_tracker(mem_tracker),
+          _mem_tracker(std::move(mem_tracker)),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
           _sender_ip(sender_ip),
@@ -38,7 +38,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, 
std::shared_ptr<MemTrackerLimi
     // _load_channels in load_channel_mgr, or it may be erased
     // immediately by gc thread.
     _last_updated_time.store(time(nullptr));
-    _mem_tracker->enable_reset_zero();
 }
 
 LoadChannel::~LoadChannel() {
@@ -59,8 +58,11 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _mem_tracker, 
_is_high_priority, _is_vec));
-            _tablets_channels.insert({index_id, channel});
+            channel.reset(new TabletsChannel(key, _load_id, _is_high_priority, 
_is_vec));
+            {
+                std::lock_guard<SpinLock> l(_tablets_channels_lock);
+                _tablets_channels.insert({index_id, channel});
+            }
         }
     }
 
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index a75639e433..5af420a970 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,9 +39,8 @@ class Cache;
 // corresponding to a certain load job
 class LoadChannel {
 public:
-    LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>& 
mem_tracker,
-                int64_t timeout_s, bool is_high_priority, const std::string& 
sender_ip,
-                bool is_vec);
+    LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> 
mem_tracker, int64_t timeout_s,
+                bool is_high_priority, const std::string& sender_ip, bool 
is_vec);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -67,7 +66,17 @@ public:
     template <typename TabletWriterAddResult>
     Status handle_mem_exceed_limit(TabletWriterAddResult* response);
 
-    int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+    int64_t mem_consumption() {
+        int64_t mem_usage = 0;
+        {
+            std::lock_guard<SpinLock> l(_tablets_channels_lock);
+            for (auto& it : _tablets_channels) {
+                mem_usage += it.second->mem_consumption();
+            }
+        }
+        _mem_tracker->set_consumption(mem_usage);
+        return mem_usage;
+    }
 
     int64_t timeout() const { return _timeout_s; }
 
@@ -89,7 +98,10 @@ protected:
                 request.write_single_replica()));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
-            _tablets_channels.erase(index_id);
+            {
+                std::lock_guard<SpinLock> l(_tablets_channels_lock);
+                _tablets_channels.erase(index_id);
+            }
             _finished_channel_ids.emplace(index_id);
         }
         return Status::OK();
@@ -102,12 +114,13 @@ private:
 
     UniqueId _load_id;
     // Tracks the total memory consumed by current load job on this BE
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    std::unique_ptr<MemTracker> _mem_tracker;
 
     // lock protect the tablets channel map
     std::mutex _lock;
     // index id -> tablets channel
     std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> 
_tablets_channels;
+    SpinLock _tablets_channels_lock;
     // This is to save finished channels id, to handle the retry request.
     std::unordered_set<int64_t> _finished_channel_ids;
     // set to true if at least one tablets channel has been opened
@@ -163,7 +176,7 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest& 
request,
     return st;
 }
 
-inline std::ostream& operator<<(std::ostream& os, const LoadChannel& 
load_channel) {
+inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
     os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << 
load_channel.mem_consumption()
        << ", last_update_time=" << 
static_cast<uint64_t>(load_channel.last_updated_time())
        << ", is high priority: " << load_channel.is_high_priority() << ")";
@@ -176,7 +189,7 @@ Status 
LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
     std::shared_ptr<TabletsChannel> channel;
     {
         // lock so that only one thread can check mem limit
-        std::lock_guard<std::mutex> l(_lock);
+        std::lock_guard<SpinLock> l(_tablets_channels_lock);
         found = _find_largest_consumption_channel(&channel);
     }
     // Release lock so that other threads can still call add_batch 
concurrently.
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index e6f908f69c..8292b25656 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -68,11 +68,9 @@ LoadChannelMgr::~LoadChannelMgr() {
 }
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
-    int64_t load_mgr_mem_limit = 
calc_process_max_load_memory(process_mem_limit);
-    _load_soft_mem_limit = load_mgr_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
-    _process_soft_mem_limit =
-            ExecEnv::GetInstance()->process_mem_tracker()->limit() * 
config::soft_mem_limit_frac;
-    _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit, 
"LoadChannelMgr");
+    _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
+    _load_soft_mem_limit = _load_hard_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
+    _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
     REGISTER_HOOK_METRIC(load_channel_mem_consumption,
                          [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -96,13 +94,10 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
             bool is_high_priority = (params.has_is_high_priority() && 
params.is_high_priority());
 
             // Use the same mem limit as LoadChannelMgr for a single load 
channel
-            auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>(
-                    _mem_tracker->limit(),
-                    fmt::format("LoadChannel#senderIp={}#loadID={}", 
params.sender_ip(),
-                                load_id.to_string()),
-                    _mem_tracker);
-            channel.reset(new LoadChannel(load_id, channel_mem_tracker, 
channel_timeout_s,
-                                          is_high_priority, params.sender_ip(),
+            auto channel_mem_tracker = 
std::make_unique<MemTracker>(fmt::format(
+                    "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), 
load_id.to_string()));
+            channel.reset(new LoadChannel(load_id, 
std::move(channel_mem_tracker),
+                                          channel_timeout_s, is_high_priority, 
params.sender_ip(),
                                           params.is_vectorized()));
             _load_channels.insert({load_id, channel});
         }
@@ -200,7 +195,7 @@ Status LoadChannelMgr::_start_load_channels_clean() {
 
     // this log print every 1 min, so that we could observe the mem 
consumption of load process
     // on this Backend
-    LOG(INFO) << "load mem consumption(bytes). limit: " << 
_mem_tracker->limit()
+    LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit
               << ", current: " << _mem_tracker->consumption()
               << ", peak: " << _mem_tracker->peak_consumption();
 
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 686322b076..4fd113cc77 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -28,6 +28,7 @@
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/ref_counted.h"
+#include "gutil/walltime.h"
 #include "olap/lru_cache.h"
 #include "runtime/load_channel.h"
 #include "runtime/tablets_channel.h"
@@ -58,6 +59,14 @@ public:
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
 
+    void refresh_mem_tracker() {
+        int64_t mem_usage = 0;
+        for (auto& kv : _load_channels) {
+            mem_usage += kv.second->mem_consumption();
+        }
+        _mem_tracker->set_consumption(mem_usage);
+    }
+
 private:
     template <typename Request>
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& 
is_eof,
@@ -80,9 +89,9 @@ protected:
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    std::unique_ptr<MemTracker> _mem_tracker;
+    int64_t _load_hard_mem_limit = -1;
     int64_t _load_soft_mem_limit = -1;
-    int64_t _process_soft_mem_limit = -1;
 
     // If hard limit reached, one thread will trigger load channel flush,
     // other threads should wait on the condition variable.
@@ -153,9 +162,9 @@ template <typename TabletWriterAddResult>
 Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* 
response) {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    DCHECK(_process_soft_mem_limit > 0);
+    int64_t process_mem_limit = MemInfo::mem_limit() * 
config::soft_mem_limit_frac;
     if (_mem_tracker->consumption() < _load_soft_mem_limit &&
-        MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+        MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
         return Status::OK();
     }
     // Pick load channel to reduce memory.
@@ -163,14 +172,15 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
     {
         std::unique_lock<std::mutex> l(_lock);
         while (_should_wait_flush) {
-            LOG(INFO) << "Reached the load hard limit " << 
_mem_tracker->limit()
+            LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
                       << ", waiting for flush";
             _wait_flush_cond.wait(l);
         }
         // Some other thread is flushing data, and not reached hard limit now,
         // we don't need to handle mem limit in current thread.
-        if (_reduce_memory_channel != nullptr && 
!_mem_tracker->limit_exceeded() &&
-            MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+        if (_reduce_memory_channel != nullptr &&
+            _mem_tracker->consumption() < _load_hard_mem_limit &&
+            MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
             return Status::OK();
         }
 
@@ -199,13 +209,13 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
         _reduce_memory_channel = channel;
 
         std::ostringstream oss;
-        if (MemInfo::proc_mem_no_allocator_cache() < _process_soft_mem_limit) {
+        if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
             oss << "reducing memory of " << *channel << " because total load 
mem consumption "
                 << PrettyPrinter::print(_mem_tracker->consumption(), 
TUnit::BYTES)
                 << " has exceeded";
-            if (_mem_tracker->limit_exceeded()) {
+            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
                 _should_wait_flush = true;
-                oss << " hard limit: " << 
PrettyPrinter::print(_mem_tracker->limit(), TUnit::BYTES);
+                oss << " hard limit: " << 
PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
             } else {
                 oss << " soft limit: " << 
PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
             }
@@ -213,7 +223,7 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
             _should_wait_flush = true;
             oss << "reducing memory of " << *channel << " because process 
memory used "
                 << PerfCounters::get_vm_rss_str() << " has exceeded limit "
-                << PrettyPrinter::print(_process_soft_mem_limit, TUnit::BYTES)
+                << PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
                 << " , tc/jemalloc allocator cache " << 
MemInfo::allocator_cache_mem_str();
         }
         LOG(INFO) << oss.str();
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index c939d9a62d..01a0d58cdb 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -72,6 +72,7 @@ public:
     void release(int64_t bytes) { consume(-bytes); }
     // Transfer 'bytes' of consumption from this tracker to 'dst'.
     void transfer_to(MemTracker* dst, int64_t bytes);
+    void set_consumption(int64_t bytes) { _consumption->set(bytes); }
 
 public:
     bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < 
consumption(); }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index f6440c0ace..6bc9449c20 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -82,6 +82,7 @@ public:
         return false;
     }
 
+    void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption 
not supported"; }
     int64_t group_num() const { return _group_num; }
     bool has_limit() const { return _limit >= 0; }
     int64_t limit() const { return _limit; }
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp 
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 58d98278c6..28539703b4 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -108,7 +108,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
                     MemTracker::print_bytes(it->second->consumption()),
                     MemTracker::print_bytes(it->second->peak_consumption()));
             expired_task_ids.emplace_back(it->first);
-        } else if (config::memory_verbose_track) {
+        } else if (config::memory_debug) {
             it->second->print_log_usage("query routine");
             it->second->enable_print_log_usage();
         }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 5289db99e3..4661fbea5d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -170,7 +170,6 @@ private:
 inline void ThreadMemTrackerMgr::init() {
     DCHECK(_limiter_tracker_stack.size() == 0);
     DCHECK(_limiter_tracker_raw == nullptr);
-    DCHECK(_consumer_tracker_stack.empty());
     init_impl();
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 9bc95e2de1..8087d6efc5 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -160,7 +160,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
                                                 const TQueryOptions& 
query_options) {
     _query_id = query_id;
     _fragment_instance_id = fragment_instance_id;
-    _mem_tracker = 
std::make_unique<MemTracker>("RuntimeFilterMergeControllerEntity");
+    _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) 
{
         int filter_id = filterid_to_desc.first;
@@ -181,7 +181,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id, UniqueId frag
 // merge data
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* 
request,
                                                  
butil::IOBufAsZeroCopyInputStream* attach_data) {
-    // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
     {
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index e090be6f4f..63960f0182 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -147,7 +147,7 @@ private:
     UniqueId _fragment_instance_id;
     // protect _filter_map
     std::mutex _filter_map_mutex;
-    std::unique_ptr<MemTracker> _mem_tracker;
+    std::shared_ptr<MemTracker> _mem_tracker;
     // TODO: convert filter id to i32
     // filter-id -> val
     std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 9023c100f1..584e101199 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -34,16 +34,14 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, 
MetricUnit::NOUNIT);
 
 std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
-                               const std::shared_ptr<MemTrackerLimiter>& 
parent_tracker,
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& 
load_id,
                                bool is_high_priority, bool is_vec)
         : _key(key),
           _state(kInitialized),
+          _load_id(load_id),
           _closed_senders(64),
           _is_high_priority(is_high_priority),
           _is_vec(is_vec) {
-    _mem_tracker = std::make_shared<MemTrackerLimiter>(
-            -1, fmt::format("TabletsChannel#indexID={}", key.index_id), 
parent_tracker);
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
         REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return 
_s_tablet_writer_count.load(); });
@@ -194,6 +192,17 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
     }
 }
 
+int64_t TabletsChannel::mem_consumption() {
+    int64_t mem_usage = 0;
+    {
+        std::lock_guard<SpinLock> l(_tablet_writers_lock);
+        for (auto& it : _tablet_writers) {
+            mem_usage += it.second->mem_consumption();
+        }
+    }
+    return mem_usage;
+}
+
 template <typename TabletWriterAddResult>
 Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
     if (_try_to_wait_flushing()) {
@@ -243,7 +252,7 @@ Status 
TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
         // So here we only flush part of the tablet, and the next time the 
reduce memory operation is triggered,
         // the tablet that has not been flushed before will accumulate more 
data, thereby reducing the number of flushes.
 
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 3;
+        int64_t mem_to_flushed = mem_consumption() / 3;
         if (total_memtable_consumption_in_flush < mem_to_flushed) {
             mem_to_flushed -= total_memtable_consumption_in_flush;
             int counter = 0;
@@ -349,7 +358,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
         wrequest.ptable_schema_param = request.schema();
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec);
+        auto st = DeltaWriter::open(&wrequest, &writer, _load_id, _is_vec);
         if (!st.ok()) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
@@ -358,7 +367,10 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
             LOG(WARNING) << ss.str();
             return Status::InternalError(ss.str());
         }
-        _tablet_writers.emplace(tablet.tablet_id(), writer);
+        {
+            std::lock_guard<SpinLock> l(_tablet_writers_lock);
+            _tablet_writers.emplace(tablet.tablet_id(), writer);
+        }
     }
     _s_tablet_writer_count += _tablet_writers.size();
     DCHECK_EQ(_tablet_writers.size(), request.tablets_size());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index cf7e067b32..bfdbf0e8f7 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -61,8 +61,7 @@ class LoadChannel;
 // Write channel for a particular (load, index).
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key,
-                   const std::shared_ptr<MemTrackerLimiter>& parent_tracker, 
bool is_high_priority,
+    TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool 
is_high_priority,
                    bool is_vec);
 
     ~TabletsChannel();
@@ -96,7 +95,7 @@ public:
     template <typename TabletWriterAddResult>
     Status reduce_mem_usage(TabletWriterAddResult* response);
 
-    int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+    int64_t mem_consumption();
 
 private:
     template <typename Request>
@@ -119,6 +118,8 @@ private:
     // make execute sequence
     std::mutex _lock;
 
+    SpinLock _tablet_writers_lock;
+
     enum State {
         kInitialized,
         kOpened,
@@ -126,6 +127,8 @@ private:
     };
     State _state;
 
+    UniqueId _load_id;
+
     // initialized in open function
     int64_t _txn_id = -1;
     int64_t _index_id = -1;
@@ -160,8 +163,6 @@ private:
 
     static std::atomic<uint64_t> _s_tablet_writer_count;
 
-    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
-
     bool _is_high_priority = false;
 
     bool _is_vec = false;
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 1e6be4a0d9..babee85c3e 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -76,18 +76,20 @@ 
SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* 
mem_tracker) {
-    if (config::memory_verbose_track) {
-        
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
-    }
+    
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+}
+
+AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
+        const std::shared_ptr<MemTracker>& mem_tracker)
+        : _mem_tracker(mem_tracker) {
+    
thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
 }
 
 AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
-    if (config::memory_verbose_track) {
 #ifndef NDEBUG
-        
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
+    
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
 #endif // NDEBUG
-        thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker();
-    }
+    thread_context()->_thread_mem_tracker_mgr->pop_consumer_tracker();
 }
 
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 861b3e0567..7d213bfa4d 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -39,6 +39,7 @@
 // Compared to count `scope_mem`, MemTracker is easier to observe from the 
outside and is thread-safe.
 // Usage example: std::unique_ptr<MemTracker> tracker = 
std::make_unique<MemTracker>("first_tracker");
 //                { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); xxx; xxx; }
+// Usually used to record query more detailed memory, including ExecNode 
operators.
 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
     auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumer(mem_tracker)
 #else
@@ -300,9 +301,16 @@ public:
 
 class AddThreadMemTrackerConsumer {
 public:
+    // The owner and user of MemTracker are in the same thread, and the raw 
pointer is faster.
     explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker);
 
+    // The owner and user of MemTracker are in different threads.
+    explicit AddThreadMemTrackerConsumer(const std::shared_ptr<MemTracker>& 
mem_tracker);
+
     ~AddThreadMemTrackerConsumer();
+
+private:
+    std::shared_ptr<MemTracker> _mem_tracker = nullptr; // Avoid mem_tracker 
being released midway.
 };
 
 class StopCheckThreadMemTrackerLimit {
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a48d3adb3e..b6a7e3a118 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -52,6 +52,7 @@
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
 #include "runtime/heartbeat_flags.h"
+#include "runtime/load_channel_mgr.h"
 #include "runtime/memory/mem_tracker_task_pool.h"
 #include "service/backend_options.h"
 #include "service/backend_service.h"
@@ -510,6 +511,7 @@ int main(int argc, char** argv) {
         doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consume(
                 allocator_cache_mem_diff);
         CONSUME_THREAD_MEM_TRACKER(allocator_cache_mem_diff);
+        
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
 
         // 1s clear the expired task mem tracker, a query mem tracker is about 
57 bytes.
         // this will cause coredump for ASAN build when running regression 
test,
@@ -518,7 +520,7 @@ int main(int argc, char** argv) {
         // The process tracker print log usage interval is 1s to avoid a large 
number of tasks being
         // canceled when the process exceeds the mem limit, resulting in too 
many duplicate logs.
         
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
-        if (doris::config::memory_verbose_track) {
+        if (doris::config::memory_debug) {
             
doris::ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage("main 
routine");
             
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
         }
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 91b5784da3..eb419eb52f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1268,7 +1268,7 @@ Status HashJoinNode::open(RuntimeState* state) {
 void HashJoinNode::_probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"HashJoinNode::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(child(0)->open(state));
 }
 
diff --git a/be/src/vec/exec/vblocking_join_node.cpp 
b/be/src/vec/exec/vblocking_join_node.cpp
index 809da414f4..f4b9258c92 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -75,7 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) {
 
 void VBlockingJoinNode::build_side_thread(RuntimeState* state, 
std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     status->set_value(construct_build_side(state));
     // Release the thread token as soon as possible (before the main thread 
joins
     // on it).  This way, if we had a chain of 10 joins using 1 additional 
thread,
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp 
b/be/src/vec/exec/vbroker_scan_node.cpp
index 4e1177f4bd..f164b5a93d 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -273,7 +273,7 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, 
"VBrokerScanNode::scanner_worker");
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
diff --git a/be/src/vec/exec/ves_http_scan_node.cpp 
b/be/src/vec/exec/ves_http_scan_node.cpp
index 9f308acc6b..d0704a2477 100644
--- a/be/src/vec/exec/ves_http_scan_node.cpp
+++ b/be/src/vec/exec/ves_http_scan_node.cpp
@@ -385,7 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level, 
std::stringstream* out) cons
 void VEsHttpScanNode::scanner_worker(int start_idx, int length, 
std::promise<Status>& p_status) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, 
"VEsHttpScanNode::scanner_worker");
     SCOPED_ATTACH_TASK(_runtime_state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     DCHECK(start_idx < length);
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 2fbd9bb99d..19ab63ae37 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -279,7 +279,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VOlapScanNode::transfer_thread");
     SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
     Status status = Status::OK();
 
     if (_vconjunct_ctx_ptr) {
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index cdc258fe99..dfb6811cbf 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -108,9 +108,9 @@ Status VSchemaScanNode::open(RuntimeState* state) {
     }
 
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_scanner_param.user) {
         TSetSessionParams param;
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 3c1e7f9924..ea2799fdea 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) {
     SAFE_DELETE(delta_writer);
 
     // test vec delta writer
-    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+    DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
@@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
                               30002, load_id,   tuple_desc,      
&(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+    DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
     ASSERT_NE(delta_writer, nullptr);
 
     MemPool pool;
@@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
                               30003, load_id,   tuple_desc,      
&(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
+    DeltaWriter::open(&write_req, &delta_writer, TUniqueId(), true);
     ASSERT_NE(delta_writer, nullptr);
 
     MemPool pool;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to