This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee754307bb [refactor](load) refactor memtable flush actively (#21634)
ee754307bb is described below

commit ee754307bb62f509a929098421039a2d9acae579
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sun Jul 30 21:31:54 2023 +0800

    [refactor](load) refactor memtable flush actively (#21634)
---
 be/src/common/config.cpp                         |   4 +-
 be/src/common/config.h                           |   4 +-
 be/src/common/daemon.cpp                         |  18 +-
 be/src/common/daemon.h                           |   4 +-
 be/src/olap/delta_writer.cpp                     |   6 +-
 be/src/olap/delta_writer.h                       |   2 +
 be/src/olap/memtable.cpp                         |   5 +-
 be/src/olap/memtable_memory_limiter.cpp          | 222 +++++++++++++++++++++++
 be/src/olap/memtable_memory_limiter.h            |  71 ++++++++
 be/src/runtime/exec_env.h                        |   3 +
 be/src/runtime/exec_env_init.cpp                 |   4 +
 be/src/runtime/load_channel.cpp                  |  13 +-
 be/src/runtime/load_channel.h                    |  64 ++-----
 be/src/runtime/load_channel_mgr.cpp              | 215 ++--------------------
 be/src/runtime/load_channel_mgr.h                |  43 ++---
 be/src/runtime/memory/mem_tracker_limiter.cpp    |   4 +-
 be/src/runtime/tablets_channel.cpp               |  85 ---------
 be/src/runtime/tablets_channel.h                 |   9 +-
 be/src/util/doris_metrics.h                      |   1 +
 be/test/olap/memtable_memory_limiter_test.cpp    | 182 +++++++++++++++++++
 docs/en/docs/admin-manual/config/be-config.md    |   5 +
 docs/zh-CN/docs/admin-manual/config/be-config.md |   5 +
 22 files changed, 574 insertions(+), 395 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index bbf7c3c6b0..74af2f754a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -551,8 +551,8 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100");
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
 
-// Sleep time in milliseconds between load channel memory refresh iterations
-DEFINE_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
+// Sleep time in milliseconds between memtbale flush mgr refresh iterations
+DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "100");
 
 // Alignment
 DEFINE_Int32(memory_max_alignment, "16");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1f196d0904..c6e54d45b7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -592,8 +592,8 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DECLARE_mInt32(memory_gc_sleep_time_ms);
 
-// Sleep time in milliseconds between load channel memory refresh iterations
-DECLARE_mInt64(load_channel_memory_refresh_sleep_time_ms);
+// Sleep time in milliseconds between memtbale flush mgr memory refresh 
iterations
+DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);
 
 // Alignment
 DECLARE_Int32(memory_max_alignment);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index cfff81088c..e1d6e97bc1 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -40,12 +40,12 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "runtime/block_spill_manager.h"
 #include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/task_group/task_group_manager.h"
@@ -276,14 +276,14 @@ void Daemon::memory_gc_thread() {
     }
 }
 
-void Daemon::load_channel_tracker_refresh_thread() {
+void Daemon::memtable_memory_limiter_tracker_refresh_thread() {
     // Refresh the memory statistics of the load channel tracker more 
frequently,
     // which helps to accurately control the memory of LoadChannelMgr.
     while (!_stop_background_threads_latch.wait_for(
-                   
std::chrono::milliseconds(config::load_channel_memory_refresh_sleep_time_ms)) &&
+                   
std::chrono::milliseconds(config::memtable_mem_tracker_refresh_interval_ms)) &&
            !k_doris_exit) {
         if (ExecEnv::GetInstance()->initialized()) {
-            
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
+            
doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
         }
     }
 }
@@ -464,9 +464,9 @@ void Daemon::start() {
             &_memory_gc_thread);
     CHECK(st.ok()) << st;
     st = Thread::create(
-            "Daemon", "load_channel_tracker_refresh_thread",
-            [this]() { this->load_channel_tracker_refresh_thread(); },
-            &_load_channel_tracker_refresh_thread);
+            "Daemon", "memtable_memory_limiter_tracker_refresh_thread",
+            [this]() { this->memtable_memory_limiter_tracker_refresh_thread(); 
},
+            &_memtable_memory_limiter_tracker_refresh_thread);
     CHECK(st.ok()) << st;
     st = Thread::create(
             "Daemon", "memory_tracker_profile_refresh_thread",
@@ -498,8 +498,8 @@ void Daemon::stop() {
     if (_memory_gc_thread) {
         _memory_gc_thread->join();
     }
-    if (_load_channel_tracker_refresh_thread) {
-        _load_channel_tracker_refresh_thread->join();
+    if (_memtable_memory_limiter_tracker_refresh_thread) {
+        _memtable_memory_limiter_tracker_refresh_thread->join();
     }
     if (_memory_tracker_profile_refresh_thread) {
         _memory_tracker_profile_refresh_thread->join();
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 0d840d6452..39a8cd59f3 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -47,7 +47,7 @@ private:
     void tcmalloc_gc_thread();
     void memory_maintenance_thread();
     void memory_gc_thread();
-    void load_channel_tracker_refresh_thread();
+    void memtable_memory_limiter_tracker_refresh_thread();
     void memory_tracker_profile_refresh_thread();
     void calculate_metrics_thread();
     void block_spill_gc_thread();
@@ -56,7 +56,7 @@ private:
     scoped_refptr<Thread> _tcmalloc_gc_thread;
     scoped_refptr<Thread> _memory_maintenance_thread;
     scoped_refptr<Thread> _memory_gc_thread;
-    scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
+    scoped_refptr<Thread> _memtable_memory_limiter_tracker_refresh_thread;
     scoped_refptr<Thread> _memory_tracker_profile_refresh_thread;
     scoped_refptr<Thread> _calculate_metrics_thread;
     scoped_refptr<Thread> _block_spill_gc_thread;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 88a7aa3b15..c3fed74c39 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -38,6 +38,7 @@
 #include "io/fs/file_writer.h" // IWYU pragma: keep
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
+#include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/beta_rowset_writer.h"
@@ -51,7 +52,6 @@
 #include "olap/tablet_meta.h"
 #include "olap/txn_manager.h"
 #include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
 #include "runtime/memory/mem_tracker.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
@@ -334,11 +334,11 @@ void DeltaWriter::_reset_mem_table() {
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num, 
_load_id.to_string()),
-            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
     auto mem_table_flush_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
                         std::to_string(tablet_id()), _mem_table_num++, 
_load_id.to_string()),
-            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
 #else
     auto mem_table_insert_tracker = std::make_shared<MemTracker>(
             
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index c4dc830af5..e45a8752e4 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -123,6 +123,8 @@ public:
 
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
+    int64_t txn_id() { return _req.txn_id; }
+
     void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
     int64_t total_received_rows() const { return _total_received_rows; }
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index dc3d9d8be7..95854c640d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -27,11 +27,12 @@
 #include <vector>
 
 #include "common/config.h"
+#include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/tablet_schema.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
-#include "runtime/load_channel_mgr.h"
+#include "runtime/thread_context.h"
 #include "tablet_meta.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
@@ -61,7 +62,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* 
tablet_schema,
 #ifndef BE_TEST
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
             fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id)),
-            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
+            ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker());
 #else
     _insert_mem_tracker_use_hook = std::make_unique<MemTracker>(
             fmt::format("MemTableHookInsert:TabletId={}", 
std::to_string(tablet_id)));
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
new file mode 100644
index 0000000000..bec748db6c
--- /dev/null
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memtable_memory_limiter.h"
+
+#include "common/config.h"
+#include "olap/delta_writer.h"
+#include "util/doris_metrics.h"
+#include "util/mem_info.h"
+#include "util/metrics.h"
+
+namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, 
MetricUnit::BYTES, "",
+                                   memtable_memory_limiter_mem_consumption,
+                                   Labels({{"type", "load"}}));
+
+// Calculate the total memory limit of all load tasks on this BE
+static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
+    if (process_mem_limit == -1) {
+        // no limit
+        return -1;
+    }
+    int32_t max_load_memory_percent = 
config::load_process_max_memory_limit_percent;
+    return process_mem_limit * max_load_memory_percent / 100;
+}
+
+MemTableMemoryLimiter::MemTableMemoryLimiter() {}
+
+MemTableMemoryLimiter::~MemTableMemoryLimiter() {
+    DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
+    for (auto writer : _writers) {
+        if (writer != nullptr) {
+            delete writer;
+            writer = nullptr;
+        }
+    }
+    _writers.clear();
+}
+
+Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
+    _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
+    _load_soft_mem_limit = _load_hard_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
+    _mem_tracker = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
+                                                       
"MemTableMemoryLimiter");
+    REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption,
+                         [this]() { return _mem_tracker->consumption(); });
+    return Status::OK();
+}
+
+void MemTableMemoryLimiter::register_writer(DeltaWriter* writer) {
+    std::lock_guard<std::mutex> l(_lock);
+    _writers.insert(writer);
+}
+
+void MemTableMemoryLimiter::deregister_writer(DeltaWriter* writer) {
+    std::lock_guard<std::mutex> l(_lock);
+    _writers.erase(writer);
+}
+
+void MemTableMemoryLimiter::handle_memtable_flush() {
+    // Check the soft limit.
+    DCHECK(_load_soft_mem_limit > 0);
+    // Record current memory status.
+    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
+    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
+#ifndef BE_TEST
+    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
+    // total memory, we don't need to flush memtable.
+    bool reduce_on_process_soft_mem_limit =
+            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
+    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+        return;
+    }
+#endif
+    // Indicate whether current thread is reducing mem on hard limit.
+    bool reducing_mem_on_hard_limit = false;
+    Status st;
+    std::vector<WriterMemItem> writers_to_reduce_mem;
+    {
+        MonotonicStopWatch timer;
+        timer.start();
+        std::unique_lock<std::mutex> l(_lock);
+        while (_should_wait_flush) {
+            _wait_flush_cond.wait(l);
+        }
+        LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
+                  << ", waited for flush, time_ns:" << timer.elapsed_time();
+#ifndef BE_TEST
+        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
+                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
+        // Some other thread is flushing data, and not reached hard limit now,
+        // we don't need to handle mem limit in current thread.
+        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
+            return;
+        }
+#endif
+
+        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
+            return lhs.mem_size < rhs.mem_size;
+        };
+        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
+
+        for (auto& writer : _writers) {
+            int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
+            mem_heap.emplace(writer, active_memtable_mem);
+        }
+        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
+        int64_t mem_consumption_in_picked_writer = 0;
+        while (!mem_heap.empty()) {
+            WriterMemItem mem_item = mem_heap.top();
+            auto writer = mem_item.writer;
+            int64_t mem_size = mem_item.mem_size;
+            writers_to_reduce_mem.emplace_back(writer, mem_size);
+            st = writer->flush_memtable_and_wait(false);
+            if (!st.ok()) {
+                auto err_msg = fmt::format(
+                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
+                        "tablet_id={}, txn_id={}, err={}",
+                        writer->tablet_id(), writer->txn_id(), st.to_string());
+                LOG(WARNING) << err_msg;
+                writer->cancel_with_status(st);
+            }
+            mem_consumption_in_picked_writer += mem_size;
+            if (mem_consumption_in_picked_writer > mem_to_flushed) {
+                break;
+            }
+            mem_heap.pop();
+        }
+        if (writers_to_reduce_mem.empty()) {
+            // should not happen, add log to observe
+            LOG(WARNING) << "failed to find suitable writers to reduce memory"
+                         << " when total load mem limit exceed";
+            return;
+        }
+
+        std::ostringstream oss;
+        oss << "reducing memory of " << writers_to_reduce_mem.size()
+            << " delta writers (total mem: "
+            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
+            << ", max mem: " << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
+            << ", min mem:" << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
+            << "), ";
+        if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
+            oss << "because total load mem consumption "
+                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << 
" has exceeded";
+            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
+                _should_wait_flush = true;
+                reducing_mem_on_hard_limit = true;
+                oss << " hard limit: " << 
PrettyPrinter::print_bytes(_load_hard_mem_limit);
+            } else {
+                _soft_reduce_mem_in_progress = true;
+                oss << " soft limit: " << 
PrettyPrinter::print_bytes(_load_soft_mem_limit);
+            }
+        } else {
+            _should_wait_flush = true;
+            reducing_mem_on_hard_limit = true;
+            oss << "because proc_mem_no_allocator_cache consumption "
+                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
+                << ", has exceeded process soft limit "
+                << PrettyPrinter::print_bytes(process_soft_mem_limit)
+                << ", total load mem consumption: "
+                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
+                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
+        }
+        LOG(INFO) << oss.str();
+    }
+
+    // wait all writers flush without lock
+    for (auto item : writers_to_reduce_mem) {
+        VLOG_NOTICE << "reducing memory, wait flush mem_size: "
+                    << PrettyPrinter::print_bytes(item.mem_size);
+        st = item.writer->wait_flush();
+        if (!st.ok()) {
+            auto err_msg = fmt::format(
+                    "tablet writer failed to reduce mem consumption by 
flushing memtable, "
+                    "tablet_id={}, txn_id={}, err={}",
+                    item.writer->tablet_id(), item.writer->txn_id(), 
st.to_string());
+            LOG(WARNING) << err_msg;
+            item.writer->cancel_with_status(st);
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        // If a thread have finished the memtable flush for soft limit, and now
+        // the hard limit is already reached, it should not update these 
variables.
+        if (reducing_mem_on_hard_limit && _should_wait_flush) {
+            _should_wait_flush = false;
+            _wait_flush_cond.notify_all();
+        }
+        if (_soft_reduce_mem_in_progress) {
+            _soft_reduce_mem_in_progress = false;
+        }
+        // refresh mem tacker to avoid duplicate reduce
+        _refresh_mem_tracker_without_lock();
+    }
+}
+
+void MemTableMemoryLimiter::_refresh_mem_tracker_without_lock() {
+    _mem_usage = 0;
+    for (auto& writer : _writers) {
+        _mem_usage += writer->mem_consumption(MemType::ALL);
+    }
+    THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), 
_mem_tracker.get());
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
new file mode 100644
index 0000000000..37cb710108
--- /dev/null
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+class DeltaWriter;
+struct WriterMemItem {
+    DeltaWriter* writer;
+    int64_t mem_size;
+};
+class MemTableMemoryLimiter {
+public:
+    MemTableMemoryLimiter();
+    ~MemTableMemoryLimiter();
+
+    Status init(int64_t process_mem_limit);
+
+    // check if the total mem consumption exceeds limit.
+    // If yes, it will flush memtable to try to reduce memory consumption.
+    void handle_memtable_flush();
+
+    void register_writer(DeltaWriter* writer);
+
+    void deregister_writer(DeltaWriter* writer);
+
+    void refresh_mem_tracker() {
+        std::lock_guard<std::mutex> l(_lock);
+        _refresh_mem_tracker_without_lock();
+    }
+
+    MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
+
+private:
+    void _refresh_mem_tracker_without_lock();
+
+    std::mutex _lock;
+    // If hard limit reached, one thread will trigger load channel flush,
+    // other threads should wait on the condition variable.
+    bool _should_wait_flush = false;
+    std::condition_variable _wait_flush_cond;
+    int64_t _mem_usage = 0;
+
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    int64_t _load_hard_mem_limit = -1;
+    int64_t _load_soft_mem_limit = -1;
+    bool _soft_reduce_mem_in_progress = false;
+
+    std::unordered_set<DeltaWriter*> _writers;
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 056f2ca125..3077eaa3f5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
 #include "olap/options.h"
 #include "util/threadpool.h"
 
@@ -176,6 +177,7 @@ public:
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
     doris::vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
     FileMetaCache* file_meta_cache() { return _file_meta_cache; }
+    MemTableMemoryLimiter* memtable_memory_limiter() { return 
_memtable_memory_limiter.get(); }
 
     // only for unit test
     void set_master_info(TMasterInfo* master_info) { this->_master_info = 
master_info; }
@@ -261,6 +263,7 @@ private:
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
+    std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index f200d0a46f..ddec82544d 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -37,6 +37,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/file_meta_cache.h"
+#include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
 #include "olap/page_cache.h"
@@ -168,6 +169,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
     _block_spill_mgr = new BlockSpillManager(_store_paths);
     _file_meta_cache = new 
FileMetaCache(config::max_external_file_meta_cache_num);
+    _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
 
     _backend_client_cache->init_metrics("backend");
     _frontend_client_cache->init_metrics("frontend");
@@ -184,6 +186,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
 
     _init_mem_env();
 
+    RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit()));
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     _heartbeat_flags = new HeartbeatFlags();
     _register_metrics();
@@ -407,6 +410,7 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_master_info);
 
     _new_load_stream_mgr.reset();
+    _memtable_memory_limiter.reset(nullptr);
     _send_batch_thread_pool.reset(nullptr);
     _buffered_reader_prefetch_thread_pool.reset(nullptr);
     _send_report_thread_pool.reset(nullptr);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 29ebb63725..90432a386d 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,9 @@
 
 namespace doris {
 
-LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> 
mem_tracker,
-                         int64_t timeout_s, bool is_high_priority, const 
std::string& sender_ip,
-                         int64_t backend_id, bool enable_profile)
+LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
+                         const std::string& sender_ip, int64_t backend_id, 
bool enable_profile)
         : _load_id(load_id),
-          _mem_tracker(std::move(mem_tracker)),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
           _sender_ip(sender_ip),
@@ -43,9 +41,9 @@ LoadChannel::LoadChannel(const UniqueId& load_id, 
std::unique_ptr<MemTracker> me
 }
 
 LoadChannel::~LoadChannel() {
-    LOG(INFO) << "load channel removed. mem peak usage=" << 
_mem_tracker->peak_consumption()
-              << ", info=" << _mem_tracker->debug_string() << ", load_id=" << 
_load_id
-              << ", is high priority=" << _is_high_priority << ", sender_ip=" 
<< _sender_ip;
+    LOG(INFO) << "load channel removed"
+              << " load_id=" << _load_id << ", is high priority=" << 
_is_high_priority
+              << ", sender_ip=" << _sender_ip;
 }
 
 void LoadChannel::_init_profile() {
@@ -148,7 +146,6 @@ void 
LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
         return;
     }
 
-    COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption());
     // TabletSink and LoadChannel in BE are M: N relationship,
     // Every once in a while LoadChannel will randomly return its own runtime 
profile to a TabletSink,
     // so usually all LoadChannel runtime profiles are saved on each 
TabletSink,
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 3b5b4b2d58..0ad24c5697 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -35,6 +35,8 @@
 #include <vector>
 
 #include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/tablets_channel.h"
 #include "util/runtime_profile.h"
@@ -52,9 +54,8 @@ class OpenPartitionRequest;
 // corresponding to a certain load job
 class LoadChannel {
 public:
-    LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> 
mem_tracker, int64_t timeout_s,
-                bool is_high_priority, const std::string& sender_ip, int64_t 
backend_id,
-                bool enable_profile);
+    LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
+                const std::string& sender_ip, int64_t backend_id, bool 
enable_profile);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -73,52 +74,17 @@ public:
 
     const UniqueId& load_id() const { return _load_id; }
 
-    int64_t mem_consumption() {
-        int64_t mem_usage = 0;
-        {
-            std::lock_guard<SpinLock> l(_tablets_channels_lock);
-            for (auto& it : _tablets_channels) {
-                mem_usage += it.second->mem_consumption();
-            }
-        }
-        _mem_tracker->set_consumption(mem_usage);
-        return mem_usage;
-    }
-
-    void get_active_memtable_mem_consumption(
-            std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, 
std::greater<int64_t>>>>*
-                    writers_mem_snap) {
-        std::lock_guard<SpinLock> l(_tablets_channels_lock);
-        for (auto& it : _tablets_channels) {
-            std::multimap<int64_t, int64_t, std::greater<int64_t>> 
tablets_channel_mem;
-            
it.second->get_active_memtable_mem_consumption(&tablets_channel_mem);
-            writers_mem_snap->emplace_back(it.first, 
std::move(tablets_channel_mem));
-        }
-    }
-
     int64_t timeout() const { return _timeout_s; }
 
     bool is_high_priority() const { return _is_high_priority; }
 
-    void flush_memtable_async(int64_t index_id, int64_t tablet_id) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _tablets_channels.find(index_id);
-        if (it != _tablets_channels.end()) {
-            it->second->flush_memtable_async(tablet_id);
-        }
-    }
-
-    void wait_flush(int64_t index_id, int64_t tablet_id) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _tablets_channels.find(index_id);
-        if (it != _tablets_channels.end()) {
-            it->second->wait_flush(tablet_id);
-        }
-    }
-
     RuntimeProfile::Counter* get_mgr_add_batch_timer() { return 
_mgr_add_batch_timer; }
     RuntimeProfile::Counter* get_handle_mem_limit_timer() { return 
_handle_mem_limit_timer; }
 
+    std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> 
get_tablets_channels() {
+        return _tablets_channels;
+    }
+
 protected:
     Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, 
bool& is_finished,
                                 const int64_t index_id);
@@ -138,7 +104,14 @@ protected:
             std::lock_guard<std::mutex> l(_lock);
             {
                 std::lock_guard<SpinLock> l(_tablets_channels_lock);
-                _tablets_channels.erase(index_id);
+                auto memtable_memory_limiter = 
ExecEnv::GetInstance()->memtable_memory_limiter();
+                auto tablet_channel_it = _tablets_channels.find(index_id);
+                if (tablet_channel_it != _tablets_channels.end()) {
+                    for (auto& writer_it : 
tablet_channel_it->second->get_tablet_writers()) {
+                        
memtable_memory_limiter->deregister_writer(writer_it.second);
+                    }
+                    _tablets_channels.erase(index_id);
+                }
             }
             _finished_channel_ids.emplace(index_id);
         }
@@ -151,8 +124,6 @@ protected:
 
 private:
     UniqueId _load_id;
-    // Tracks the total memory consumed by current load job on this BE
-    std::unique_ptr<MemTracker> _mem_tracker;
 
     std::unique_ptr<RuntimeProfile> _profile;
     RuntimeProfile* _self_profile;
@@ -167,6 +138,7 @@ private:
     // lock protect the tablets channel map
     std::mutex _lock;
     // index id -> tablets channel
+    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter;
     std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> 
_tablets_channels;
     SpinLock _tablets_channels_lock;
     // This is to save finished channels id, to handle the retry request.
@@ -192,7 +164,7 @@ private:
 };
 
 inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
-    os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << 
load_channel.mem_consumption()
+    os << "LoadChannel(id=" << load_channel.load_id()
        << ", last_update_time=" << 
static_cast<uint64_t>(load_channel.last_updated_time())
        << ", is high priority: " << load_channel.is_high_priority() << ")";
     return os;
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 6862b99b2f..df77840c56 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -57,16 +57,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, 
MetricUnit::BYTES, "",
                                    mem_consumption, Labels({{"type", 
"load"}}));
 
-// Calculate the total memory limit of all load tasks on this BE
-static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
-    if (process_mem_limit == -1) {
-        // no limit
-        return -1;
-    }
-    int32_t max_load_memory_percent = 
config::load_process_max_memory_limit_percent;
-    return process_mem_limit * max_load_memory_percent / 100;
-}
-
 static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) {
     int64_t load_channel_timeout_s = 
config::streaming_load_rpc_max_alive_time_sec;
     if (timeout_in_req_s > 0) {
@@ -93,12 +83,7 @@ LoadChannelMgr::~LoadChannelMgr() {
 }
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
-    _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
-    _load_soft_mem_limit = _load_hard_mem_limit * 
config::load_process_soft_mem_limit_percent / 100;
-    _mem_tracker =
-            std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, 
"LoadChannelMgr");
-    REGISTER_HOOK_METRIC(load_channel_mem_consumption,
-                         [this]() { return _mem_tracker->consumption(); });
+    _memtable_memory_limiter = 
ExecEnv::GetInstance()->memtable_memory_limiter();
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
     RETURN_IF_ERROR(_start_bg_worker());
     return Status::OK();
@@ -119,24 +104,18 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
             int64_t channel_timeout_s = 
calc_channel_timeout_s(timeout_in_req_s);
             bool is_high_priority = (params.has_is_high_priority() && 
params.is_high_priority());
 
-            // Use the same mem limit as LoadChannelMgr for a single load 
channel
-#ifndef BE_TEST
-            auto channel_mem_tracker = std::make_unique<MemTracker>(
-                    fmt::format("LoadChannel#senderIp={}#loadID={}", 
params.sender_ip(),
-                                load_id.to_string()),
-                    ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
-#else
-            auto channel_mem_tracker = 
std::make_unique<MemTracker>(fmt::format(
-                    "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), 
load_id.to_string()));
-#endif
-            channel.reset(new LoadChannel(load_id, 
std::move(channel_mem_tracker),
-                                          channel_timeout_s, is_high_priority, 
params.sender_ip(),
-                                          params.backend_id(), 
params.enable_profile()));
+            channel.reset(new LoadChannel(load_id, channel_timeout_s, 
is_high_priority,
+                                          params.sender_ip(), 
params.backend_id(),
+                                          params.enable_profile()));
             _load_channels.insert({load_id, channel});
         }
     }
 
     RETURN_IF_ERROR(channel->open(params));
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        _register_channel_all_writers(channel);
+    }
     return Status::OK();
 }
 
@@ -182,7 +161,7 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBlockRequest& request,
         // If this is a high priority load task, do not handle this.
         // because this may block for a while, which may lead to rpc timeout.
         SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-        _handle_mem_exceed_limit();
+        _memtable_memory_limiter->handle_memtable_flush();
     }
 
     // 3. add batch to load channel
@@ -205,7 +184,10 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId 
load_id) {
     VLOG_NOTICE << "removing load channel " << load_id << " because it's 
finished";
     {
         std::lock_guard<std::mutex> l(_lock);
-        _load_channels.erase(load_id);
+        if (_load_channels.find(load_id) != _load_channels.end()) {
+            
_deregister_channel_all_writers(_load_channels.find(load_id)->second);
+            _load_channels.erase(load_id);
+        }
         auto handle = _last_success_channel->insert(load_id.to_string(), 
nullptr, 1, dummy_deleter);
         _last_success_channel->release(handle);
     }
@@ -219,6 +201,7 @@ Status LoadChannelMgr::cancel(const 
PTabletWriterCancelRequest& params) {
         std::lock_guard<std::mutex> l(_lock);
         if (_load_channels.find(load_id) != _load_channels.end()) {
             cancelled_channel = _load_channels[load_id];
+            _deregister_channel_all_writers(cancelled_channel);
             _load_channels.erase(load_id);
         }
     }
@@ -263,6 +246,7 @@ Status LoadChannelMgr::_start_load_channels_clean() {
         }
 
         for (auto& key : need_delete_channel_ids) {
+            _deregister_channel_all_writers(_load_channels.find(key)->second);
             _load_channels.erase(key);
             LOG(INFO) << "erase timeout load channel: " << key;
         }
@@ -277,175 +261,6 @@ Status LoadChannelMgr::_start_load_channels_clean() {
                   << ", timeout(s): " << channel->timeout();
     }
 
-    // this log print every 1 min, so that we could observe the mem 
consumption of load process
-    // on this Backend
-    LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit
-              << ", current: " << _mem_tracker->consumption()
-              << ", peak: " << _mem_tracker->peak_consumption()
-              << ", total running load channels: " << _load_channels.size();
-
     return Status::OK();
 }
-
-void LoadChannelMgr::_handle_mem_exceed_limit() {
-    // Check the soft limit.
-    DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to reduce memory of load jobs.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
-        return;
-    }
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    // tuple<LoadChannel, index_id, tablet_id, mem_size>
-    std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t, 
int64_t>>
-            writers_to_reduce_mem;
-    {
-        MonotonicStopWatch timer;
-        timer.start();
-        std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-
-        // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>>
-        using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t,
-                                      std::multimap<int64_t, int64_t, 
std::greater<int64_t>>>;
-        std::vector<WritersMem> all_writers_mem;
-
-        // tuple<current iterator in multimap, end iterator in multimap, pos 
in all_writers_mem>
-        using WriterMemItem =
-                std::tuple<std::multimap<int64_t, int64_t, 
std::greater<int64_t>>::iterator,
-                           std::multimap<int64_t, int64_t, 
std::greater<int64_t>>::iterator,
-                           size_t>;
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return std::get<0>(lhs)->first < std::get<0>(rhs)->first;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)>
-                tablets_mem_heap(cmp);
-
-        for (auto& kv : _load_channels) {
-            if (kv.second->is_high_priority()) {
-                // do not select high priority channel to reduce memory
-                // to avoid blocking the.
-                continue;
-            }
-            std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, 
std::greater<int64_t>>>>
-                    writers_mem_snap;
-            kv.second->get_active_memtable_mem_consumption(&writers_mem_snap);
-            for (auto item : writers_mem_snap) {
-                // multimap is empty
-                if (item.second.empty()) {
-                    continue;
-                }
-                all_writers_mem.emplace_back(kv.second, item.first, 
std::move(item.second));
-            }
-        }
-        for (size_t i = 0; i < all_writers_mem.size(); i++) {
-            tablets_mem_heap.emplace(std::get<2>(all_writers_mem[i]).begin(),
-                                     std::get<2>(all_writers_mem[i]).end(), i);
-        }
-
-        // reduce 1/10 memory every time
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!tablets_mem_heap.empty()) {
-            WriterMemItem tablet_mem_item = tablets_mem_heap.top();
-            size_t pos = std::get<2>(tablet_mem_item);
-            auto load_channel = std::get<0>(all_writers_mem[pos]);
-            int64_t index_id = std::get<1>(all_writers_mem[pos]);
-            int64_t tablet_id = std::get<0>(tablet_mem_item)->second;
-            int64_t mem_size = std::get<0>(tablet_mem_item)->first;
-            writers_to_reduce_mem.emplace_back(load_channel, index_id, 
tablet_id, mem_size);
-            load_channel->flush_memtable_async(index_id, tablet_id);
-            mem_consumption_in_picked_writer += 
std::get<0>(tablet_mem_item)->first;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-            tablets_mem_heap.pop();
-            if (++std::get<0>(tablet_mem_item) != 
std::get<1>(tablet_mem_item)) {
-                tablets_mem_heap.push(tablet_mem_item);
-            }
-        }
-
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
-        }
-
-        std::ostringstream oss;
-        oss << "reducing memory of " << writers_to_reduce_mem.size()
-            << " delta writers (total mem: "
-            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << 
", max mem: "
-            << 
PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
-            << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front())
-            << ", min mem:" << 
PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
-            << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << 
"), ";
-        if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
-            oss << "because total load mem consumption "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << 
" has exceeded";
-            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
-                _should_wait_flush = true;
-                reducing_mem_on_hard_limit = true;
-                oss << " hard limit: " << 
PrettyPrinter::print_bytes(_load_hard_mem_limit);
-            } else {
-                _soft_reduce_mem_in_progress = true;
-                oss << " soft limit: " << 
PrettyPrinter::print_bytes(_load_soft_mem_limit);
-            }
-        } else {
-            _should_wait_flush = true;
-            reducing_mem_on_hard_limit = true;
-            oss << "because proc_mem_no_allocator_cache consumption "
-                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
-                << ", has exceeded process soft limit "
-                << PrettyPrinter::print_bytes(process_soft_mem_limit)
-                << ", total load mem consumption: "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
-                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
-        }
-        LOG(INFO) << oss.str();
-    }
-
-    // wait all writers flush without lock
-    for (auto item : writers_to_reduce_mem) {
-        VLOG_NOTICE << "reducing memory, wait flush load_id: " << 
std::get<0>(item)->load_id()
-                    << ", index_id: " << std::get<1>(item) << ", tablet_id: " 
<< std::get<2>(item)
-                    << ", mem_size: " << 
PrettyPrinter::print_bytes(std::get<3>(item));
-        std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item));
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        // If a thread have finished the memtable flush for soft limit, and now
-        // the hard limit is already reached, it should not update these 
variables.
-        if (reducing_mem_on_hard_limit && _should_wait_flush) {
-            _should_wait_flush = false;
-            _wait_flush_cond.notify_all();
-        }
-        if (_soft_reduce_mem_in_progress) {
-            _soft_reduce_mem_in_progress = false;
-        }
-        // refresh mem tacker to avoid duplicate reduce
-        _refresh_mem_tracker_without_lock();
-    }
-}
-
 } // namespace doris
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index cad611b75b..db137aa5c0 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "gutil/ref_counted.h"
 #include "olap/lru_cache.h"
+#include "olap/memtable_memory_limiter.h"
 #include "runtime/load_channel.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/thread_context.h"
@@ -60,51 +61,39 @@ public:
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
 
-    void refresh_mem_tracker() {
-        std::lock_guard<std::mutex> l(_lock);
-        _refresh_mem_tracker_without_lock();
-    }
-    MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
-
 private:
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& 
is_eof,
                              const UniqueId& load_id, const 
PTabletWriterAddBlockRequest& request);
 
     void _finish_load_channel(UniqueId load_id);
-    // check if the total load mem consumption exceeds limit.
-    // If yes, it will pick a load channel to try to reduce memory consumption.
-    void _handle_mem_exceed_limit();
 
     Status _start_bg_worker();
 
-    // lock should be held when calling this method
-    void _refresh_mem_tracker_without_lock() {
-        _mem_usage = 0;
-        for (auto& kv : _load_channels) {
-            _mem_usage += kv.second->mem_consumption();
+    void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel> 
channel) {
+        for (auto& tablet_channel_it : channel->get_tablets_channels()) {
+            for (auto& writer_it : 
tablet_channel_it.second->get_tablet_writers()) {
+                _memtable_memory_limiter->register_writer(writer_it.second);
+            }
+        }
+    }
+
+    void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel> 
channel) {
+        for (auto& tablet_channel_it : channel->get_tablets_channels()) {
+            for (auto& writer_it : 
tablet_channel_it.second->get_tablet_writers()) {
+                _memtable_memory_limiter->deregister_writer(writer_it.second);
+            }
         }
-        THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - 
_mem_tracker->consumption(),
-                                       _mem_tracker.get());
     }
 
 protected:
     // lock protect the load channel map
     std::mutex _lock;
     // load id -> load channel
+    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter ;
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
     Cache* _last_success_channel = nullptr;
 
-    // check the total load channel mem consumption of this Backend
-    int64_t _mem_usage = 0;
-    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
-    int64_t _load_hard_mem_limit = -1;
-    int64_t _load_soft_mem_limit = -1;
-    bool _soft_reduce_mem_in_progress = false;
-
-    // If hard limit reached, one thread will trigger load channel flush,
-    // other threads should wait on the condition variable.
-    bool _should_wait_flush = false;
-    std::condition_variable _wait_flush_cond;
+    MemTableMemoryLimiter* _memtable_memory_limiter = nullptr;
 
     CountDownLatch _stop_background_threads_latch;
     // thread to clean timeout load channels
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 517589f767..610af53c77 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -26,9 +26,9 @@
 #include <queue>
 #include <utility>
 
+#include "olap/memtable_memory_limiter.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
-#include "runtime/load_channel_mgr.h"
 #include "runtime/task_group/task_group.h"
 #include "service/backend_options.h"
 #include "util/mem_info.h"
@@ -240,7 +240,7 @@ std::string MemTrackerLimiter::log_process_usage_str() {
 
     // Add additional tracker printed when memory exceeds limit.
     snapshots.emplace_back(
-            
ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot());
+            
ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot());
 
     detail += "\nMemory Tracker Summary:";
     for (const auto& snapshot : snapshots) {
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 6d94b00007..7f45b404b7 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -291,18 +291,6 @@ void TabletsChannel::_add_error_tablet(
                   << "err msg " << error;
 }
 
-int64_t TabletsChannel::mem_consumption() {
-    int64_t mem_usage = 0;
-    {
-        std::lock_guard<SpinLock> l(_tablet_writers_lock);
-        for (auto& it : _tablet_writers) {
-            int64_t writer_mem = it.second->mem_consumption(MemType::ALL);
-            mem_usage += writer_mem;
-        }
-    }
-    return mem_usage;
-}
-
 void TabletsChannel::refresh_profile() {
     int64_t write_mem_usage = 0;
     int64_t flush_mem_usage = 0;
@@ -330,16 +318,6 @@ void TabletsChannel::refresh_profile() {
     COUNTER_SET(_max_tablet_flush_memory_usage_counter, 
max_tablet_flush_mem_usage);
 }
 
-void TabletsChannel::get_active_memtable_mem_consumption(
-        std::multimap<int64_t, int64_t, std::greater<int64_t>>* 
mem_consumptions) {
-    mem_consumptions->clear();
-    std::lock_guard<SpinLock> l(_tablet_writers_lock);
-    for (auto& it : _tablet_writers) {
-        int64_t active_memtable_mem = 
it.second->active_memtable_mem_consumption();
-        mem_consumptions->emplace(active_memtable_mem, it.first);
-    }
-}
-
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
@@ -501,69 +479,6 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     return Status::OK();
 }
 
-void TabletsChannel::flush_memtable_async(int64_t tablet_id) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (_state == kFinished) {
-        // TabletsChannel is closed without LoadChannel's lock,
-        // therefore it's possible for reduce_mem_usage() to be called right 
after close()
-        LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: 
" << _txn_id
-                  << ", index_id: " << _index_id;
-        return;
-    }
-
-    auto iter = _tablet_writers.find(tablet_id);
-    if (iter == _tablet_writers.end()) {
-        return;
-    }
-
-    if (!(_reducing_tablets.insert(tablet_id).second)) {
-        return;
-    }
-
-    Status st = iter->second->flush_memtable_and_wait(false);
-    if (!st.ok()) {
-        auto err_msg = fmt::format(
-                "tablet writer failed to reduce mem consumption by flushing 
memtable, "
-                "tablet_id={}, txn_id={}, err={}",
-                tablet_id, _txn_id, st.to_string());
-        LOG(WARNING) << err_msg;
-        iter->second->cancel_with_status(st);
-        _add_broken_tablet(tablet_id);
-    }
-}
-
-void TabletsChannel::wait_flush(int64_t tablet_id) {
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        if (_state == kFinished) {
-            // TabletsChannel is closed without LoadChannel's lock,
-            // therefore it's possible for reduce_mem_usage() to be called 
right after close()
-            LOG(INFO) << "TabletsChannel is closed when reduce mem usage, 
txn_id: " << _txn_id
-                      << ", index_id: " << _index_id;
-            return;
-        }
-    }
-
-    auto iter = _tablet_writers.find(tablet_id);
-    if (iter == _tablet_writers.end()) {
-        return;
-    }
-    Status st = iter->second->wait_flush();
-    if (!st.ok()) {
-        auto err_msg = fmt::format(
-                "tablet writer failed to reduce mem consumption by flushing 
memtable, "
-                "tablet_id={}, txn_id={}, err={}",
-                tablet_id, _txn_id, st.to_string());
-        LOG(WARNING) << err_msg;
-        iter->second->cancel_with_status(st);
-        _add_broken_tablet(tablet_id);
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _reducing_tablets.erase(tablet_id);
-    }
-}
 void TabletsChannel::_add_broken_tablet(int64_t tablet_id) {
     std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
     _broken_tablets.insert(tablet_id);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b31aaee214..17bdcf5ec8 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -110,15 +110,9 @@ public:
     // no-op when this channel has been closed or cancelled
     Status cancel();
 
-    int64_t mem_consumption();
-
     void refresh_profile();
 
-    void get_active_memtable_mem_consumption(
-            std::multimap<int64_t, int64_t, std::greater<int64_t>>* 
mem_consumptions);
-
-    void flush_memtable_async(int64_t tablet_id);
-    void wait_flush(int64_t tablet_id);
+    std::unordered_map<int64_t, DeltaWriter*> get_tablet_writers() { return 
_tablet_writers; }
 
 private:
     template <typename Request>
@@ -175,6 +169,7 @@ private:
     std::map<int64, int64> _tablet_partition_map;
 
     // tablet_id -> TabletChannel
+    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter;
     std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
     // broken tablet ids.
     // If a tablet write fails, it's id will be added to this set.
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 646a4449c0..39036d4589 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -190,6 +190,7 @@ public:
     UIntGauge* compaction_mem_consumption;
     UIntGauge* load_mem_consumption;
     UIntGauge* load_channel_mem_consumption;
+    UIntGauge* memtable_memory_limiter_mem_consumption;
     UIntGauge* query_mem_consumption;
     UIntGauge* schema_change_mem_consumption;
     UIntGauge* storage_migration_mem_consumption;
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
new file mode 100644
index 0000000000..7b49b22b32
--- /dev/null
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memtable_memory_limiter.h"
+
+#include "exec/tablet_info.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/delta_writer.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+static const uint32_t MAX_PATH_LEN = 1024;
+
+static void create_tablet_request(int64_t tablet_id, int32_t schema_hash,
+                                  TCreateTabletReq* request) {
+    request->tablet_id = tablet_id;
+    request->__set_version(1);
+    request->tablet_schema.schema_hash = schema_hash;
+    request->tablet_schema.short_key_column_count = 3;
+    request->tablet_schema.keys_type = TKeysType::AGG_KEYS;
+    request->tablet_schema.storage_type = TStorageType::COLUMN;
+    request->__set_storage_format(TStorageFormat::V2);
+
+    TColumn k1;
+    k1.column_name = "k1";
+    k1.__set_is_key(true);
+    k1.column_type.type = TPrimitiveType::TINYINT;
+    request->tablet_schema.columns.push_back(k1);
+
+    TColumn k2;
+    k2.column_name = "k2";
+    k2.__set_is_key(true);
+    k2.column_type.type = TPrimitiveType::SMALLINT;
+    request->tablet_schema.columns.push_back(k2);
+
+    TColumn k3;
+    k3.column_name = "k3";
+    k3.__set_is_key(true);
+    k3.column_type.type = TPrimitiveType::INT;
+    request->tablet_schema.columns.push_back(k3);
+}
+
+static TDescriptorTable create_descriptor_tablet() {
+    TDescriptorTableBuilder dtb;
+    TTupleDescriptorBuilder tuple_builder;
+
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
+    tuple_builder.add_slot(
+            
TSlotDescriptorBuilder().type(TYPE_INT).column_name("k3").column_pos(2).build());
+
+    tuple_builder.build(&dtb);
+    return dtb.desc_tbl();
+}
+
+class MemTableMemoryLimiterTest : public testing::Test {
+protected:
+    void SetUp() override {
+        // set path
+        char buffer[MAX_PATH_LEN];
+        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        config::storage_root_path = std::string(buffer) + "/data_test";
+        
io::global_local_filesystem()->delete_and_create_directory(config::storage_root_path);
+        std::vector<StorePath> paths;
+        paths.emplace_back(config::storage_root_path, -1);
+
+        _mgr = new MemTableMemoryLimiter();
+        doris::EngineOptions options;
+        options.store_paths = paths;
+        Status s = doris::StorageEngine::open(options, &_engine);
+        ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+        exec_env->set_storage_engine(_engine);
+        _engine->start_bg_threads();
+    }
+
+    void TearDown() override {
+        if (_engine != nullptr) {
+            _engine->stop();
+            delete _engine;
+            _engine = nullptr;
+        }
+        if (_mgr != nullptr) {
+            delete _mgr;
+            _mgr = nullptr;
+        }
+        EXPECT_EQ(system("rm -rf ./data_test"), 0);
+        
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
 + "/" +
+                                                        UNUSED_PREFIX);
+    }
+
+    StorageEngine* _engine = nullptr;
+    MemTableMemoryLimiter* _mgr = nullptr;
+};
+
+TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
+    TCreateTabletReq request;
+    create_tablet_request(10000, 270068372, &request);
+    Status res = _engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    OlapTableSchemaParam param;
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {
+            10000, 270068372, 20002, 30002, load_id, tuple_desc, 
&(tuple_desc->slots()),
+            false, &param};
+    DeltaWriter* delta_writer = nullptr;
+    std::unique_ptr<RuntimeProfile> profile;
+    profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
+    DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId());
+    ASSERT_NE(delta_writer, nullptr);
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       
slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+    auto columns = block.mutate_columns();
+    {
+        int8_t k1 = -127;
+        columns[0]->insert_data((const char*)&k1, sizeof(k1));
+
+        int16_t k2 = -32767;
+        columns[1]->insert_data((const char*)&k2, sizeof(k2));
+
+        int32_t k3 = -2147483647;
+        columns[2]->insert_data((const char*)&k3, sizeof(k3));
+
+        res = delta_writer->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+    }
+    std::mutex lock;
+    _mgr->init(100);
+    {
+        std::lock_guard<std::mutex> l(lock);
+        _mgr->register_writer(delta_writer);
+    }
+    _mgr->handle_memtable_flush();
+    CHECK_EQ(0, delta_writer->active_memtable_mem_consumption());
+    {
+        std::lock_guard<std::mutex> l(lock);
+        _mgr->deregister_writer(delta_writer);
+    }
+
+    res = delta_writer->close();
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->build_rowset();
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    EXPECT_EQ(Status::OK(), res);
+    res = _engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
+    EXPECT_EQ(Status::OK(), res);
+    delete delta_writer;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 8ab61da608..fe00733df9 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -958,6 +958,11 @@ BaseCompaction:546859:
 * Description: Whether to use mmap to allocate memory
 * Default value: false
 
+#### `memtable_mem_tracker_refresh_interval_ms`
+
+* Description: Interval in milliseconds between memtbale flush mgr refresh 
iterations
+* Default value: 100
+
 #### `download_cache_buffer_size`
 
 * Type: int64
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index c044319a8d..170a1f785b 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -971,6 +971,11 @@ BaseCompaction:546859:
 * 描述:是否使用mmap分配内存
 * 默认值:false
 
+#### `memtable_mem_tracker_refresh_interval_ms`
+
+* 描述:memtable主动下刷时刷新内存统计的周期(毫秒)
+* 默认值:100
+
 #### `download_cache_buffer_size`
 
 * 类型: int64


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to