This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 9c3f3fcb9a2 refactor write buffer (#42556)
9c3f3fcb9a2 is described below

commit 9c3f3fcb9a2f38017b72ac9972bba077901b5fcd
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Oct 28 14:36:37 2024 +0800

    refactor write buffer (#42556)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/olap/memtable.cpp                              |  8 ++++++--
 be/src/olap/memtable_flush_executor.cpp               |  2 ++
 be/src/runtime/memory/mem_tracker_limiter.cpp         | 18 +++---------------
 be/src/runtime/memory/mem_tracker_limiter.h           | 19 +++++--------------
 be/src/runtime/workload_group/workload_group.cpp      |  5 ++---
 be/src/runtime/workload_group/workload_group.h        |  3 +--
 .../runtime/workload_group/workload_group_manager.cpp |  5 ++---
 7 files changed, 21 insertions(+), 39 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index facbc90c450..cc16cb481ed 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -78,7 +78,6 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _mem_tracker = std::make_shared<MemTracker>();
-    _query_thread_context.query_mem_tracker->push_load_buffer(_mem_tracker);
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -144,7 +143,8 @@ void MemTable::_init_agg_functions(const vectorized::Block* 
block) {
 }
 
 MemTable::~MemTable() {
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+            _query_thread_context.query_mem_tracker->write_tracker());
     if (_is_flush_success) {
         // If the memtable is flush success, then its memtracker's consumption 
should be 0
         if (_mem_tracker->consumption() != 0 && 
config::crash_in_memory_tracker_inaccurate) {
@@ -184,6 +184,8 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
 
 Status MemTable::insert(const vectorized::Block* input_block,
                         const std::vector<uint32_t>& row_idxs) {
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+            _query_thread_context.query_mem_tracker->write_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
 
     if (_is_first_insertion) {
@@ -580,6 +582,8 @@ void MemTable::_aggregate() {
 }
 
 void MemTable::shrink_memtable_by_agg() {
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+            _query_thread_context.query_mem_tracker->write_tracker());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     if (_keys_type == KeysType::DUP_KEYS) {
         return;
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index dc911647be8..7599a3f6b86 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -142,6 +142,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     signal::set_signal_task_id(_rowset_writer->load_id());
     signal::tablet_id = memtable->tablet_id();
     {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                
memtable->query_thread_context().query_mem_tracker->write_tracker());
         SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
         std::unique_ptr<vectorized::Block> block;
         RETURN_IF_ERROR(memtable->to_block(&block));
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 487b7d48fae..c0555c2fcf3 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -82,6 +82,9 @@ std::shared_ptr<MemTrackerLimiter> 
MemTrackerLimiter::create_shared(MemTrackerLi
                                                                     const 
std::string& label,
                                                                     int64_t 
byte_limit) {
     auto tracker = std::make_shared<MemTrackerLimiter>(type, label, 
byte_limit);
+    // Write tracker is only used to tracker the size, so limit == -1
+    auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable" 
+ label, -1);
+    tracker->_write_tracker.swap(write_tracker);
 #ifndef BE_TEST
     DCHECK(ExecEnv::tracking_memory());
     std::lock_guard<std::mutex> l(
@@ -257,21 +260,6 @@ void MemTrackerLimiter::clean_tracker_limiter_group() {
 #endif
 }
 
-void MemTrackerLimiter::update_load_buffer_size() {
-    std::lock_guard l(_load_buffer_lock);
-    int64_t total_buf_size = 0;
-    for (auto memtable_tracker = _load_buffers.begin(); memtable_tracker != 
_load_buffers.end();) {
-        auto m = memtable_tracker->lock();
-        if (m == nullptr) {
-            memtable_tracker = _load_buffers.erase(memtable_tracker);
-        } else {
-            total_buf_size += m->consumption();
-            ++memtable_tracker;
-        }
-    }
-    _load_buffer_size = total_buf_size;
-}
-
 void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
                                                    MemTrackerLimiter::Type 
type) {
     if (type == Type::GLOBAL) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 6f0ecdbe975..1b03f2f2082 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -130,7 +130,6 @@ public:
                                                             int64_t byte_limit 
= -1);
     // byte_limit equal to -1 means no consumption limit, only participate in 
process memory statistics.
     MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
-
     ~MemTrackerLimiter();
 
     Type type() const { return _type; }
@@ -210,8 +209,7 @@ public:
         if (UNLIKELY(bytes == 0)) {
             return true;
         }
-        // Reserve will check limit, should ignore load buffer size.
-        bool rt = _mem_counter.try_add(bytes - _load_buffer_size, _limit);
+        bool rt = _mem_counter.try_add(bytes, _limit);
         if (rt && _query_statistics) {
             _query_statistics->set_max_peak_memory_bytes(peak_consumption());
             _query_statistics->set_current_used_memory_bytes(consumption());
@@ -237,14 +235,9 @@ public:
     static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* 
profile, int top_num);
     static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
 
-    void push_load_buffer(std::shared_ptr<MemTracker> memtable_tracker) {
-        std::lock_guard l(_load_buffer_lock);
-        _load_buffers.push_back(memtable_tracker);
-    }
-
-    void update_load_buffer_size();
+    int64_t load_buffer_size() const { return _write_tracker->consumption(); }
 
-    int64_t load_buffer_size() const { return _load_buffer_size; }
+    std::shared_ptr<MemTrackerLimiter> write_tracker() { return 
_write_tracker; }
 
     void print_log_usage(const std::string& msg);
     void enable_print_log_usage() { _enable_print_log_usage = true; }
@@ -340,9 +333,7 @@ private:
 
     std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 
-    std::mutex _load_buffer_lock;
-    std::vector<std::weak_ptr<MemTracker>> _load_buffers;
-    std::atomic<int64_t> _load_buffer_size = 0;
+    std::shared_ptr<MemTrackerLimiter> _write_tracker;
 
     struct AddressSanitizer {
         size_t size;
@@ -377,7 +368,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) 
{
         return Status::OK();
     }
     // check limit should ignore memtable size, because it is treated as a 
cache
-    if (_limit > 0 && consumption() - _load_buffer_size + bytes > _limit) {
+    if (_limit > 0 && consumption() + bytes > _limit) {
         return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, 
{}",
                                                        
MemCounter::print_bytes(bytes),
                                                        
tracker_limit_exceeded_str()));
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index f7a42a34a5a..c0895e8f0fd 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -194,7 +194,6 @@ int64_t WorkloadGroup::refresh_memory_usage() {
             if (tracker == nullptr) {
                 trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
             } else {
-                tracker->update_load_buffer_size();
                 used_memory += tracker->consumption();
                 load_buffer_size += tracker->load_buffer_size();
                 ++trackerWptr;
@@ -202,13 +201,13 @@ int64_t WorkloadGroup::refresh_memory_usage() {
         }
     }
     // refresh total memory used.
-    _total_mem_used = used_memory;
+    _total_mem_used = used_memory + load_buffer_size;
     _load_buffer_size = load_buffer_size;
     // reserve memory is recorded in the query mem tracker
     // and _total_mem_used already contains all the current reserve memory.
     // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
     _wg_refresh_interval_memory_growth.store(0.0);
-    _mem_used_status->set_value(used_memory);
+    _mem_used_status->set_value(_total_mem_used);
     return used_memory;
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 5ec574a5381..ccdc6374ce8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -80,7 +80,7 @@ public:
 
     int64_t total_mem_used() const { return _total_mem_used; }
 
-    int64_t load_mem_used() const { return _load_buffer_size; }
+    int64_t write_buffer_size() const { return _load_buffer_size; }
 
     void enable_write_buffer_limit(bool enable_limit) { 
_enable_write_buffer_limit = enable_limit; }
 
@@ -190,7 +190,6 @@ public:
 
     void update_total_local_scan_io_adder(size_t scan_bytes);
 
-    int64_t get_mem_used() { return _mem_used_status->get_value(); }
     uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
     int64_t get_local_scan_bytes_per_second() {
         return _total_local_scan_io_per_second->get_value();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 25c983b98f3..d7130ae26e4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -232,7 +232,7 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
     for (const auto& [id, wg] : _workload_groups) {
         SchemaScannerHelper::insert_int64_value(0, be_id, block);
         SchemaScannerHelper::insert_int64_value(1, wg->id(), block);
-        SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block);
+        SchemaScannerHelper::insert_int64_value(2, wg->total_mem_used(), 
block);
 
         double cpu_usage_p =
                 (double)wg->get_cpu_usage() / 
(double)total_cpu_time_ns_per_second * 100;
@@ -543,7 +543,6 @@ int64_t 
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryConte
         return total_freed_mem;
     }
     // 2. Cancel top usage query, one by one
-    std::map<WorkloadGroupPtr, int64_t> wg_mem_usage;
     using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
     auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
         return left.second < right.second;
@@ -653,7 +652,7 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
     wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
     int64_t wg_high_water_mark_limit =
             (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 
1.0 / 100);
-    int64_t memtable_usage = wg->load_mem_used();
+    int64_t memtable_usage = wg->write_buffer_size();
     int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
     if (memtable_usage > wg->write_buffer_limit()) {
         wg_high_water_mark_except_load = wg_high_water_mark_limit - 
wg->write_buffer_limit();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to