github-actions[bot] commented on code in PR #25653:
URL: https://github.com/apache/doris/pull/25653#discussion_r1369904458


##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,135 +59,81 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit ||
+           (MemInfo::proc_mem_no_allocator_cache() >= 
MemInfo::soft_mem_limit() &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10);
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
+           MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
+        while (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _load_soft_mem_limit ||
+                (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {
+                _flush_active_memtables();
             }
+            _soft_limit_end_cond.wait(l);
         }
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!mem_heap.empty()) {
-            WriterMemItem mem_item = mem_heap.top();
-            mem_heap.pop();
-            auto writer = mem_item.writer.lock();
-            if (!writer) {
-                continue;
-            }
-            int64_t mem_size = mem_item.mem_size;
-            writers_to_reduce_mem.emplace_back(writer, mem_size);
-            st = writer->flush_async();
-            if (!st.ok()) {
-                auto err_msg = fmt::format(
-                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                        "tablet_id={}, err={}",
-                        writer->tablet_id(), st.to_string());
-                LOG(WARNING) << err_msg;
-                static_cast<void>(writer->cancel_with_status(st));
-            }
-            mem_consumption_in_picked_writer += mem_size;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-        }
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
-        }
+        timer.stop();
+        int64_t time_ms = timer.elapsed_time() / 1000 / 1000;

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
           int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
                                                           ^
   ```
   



##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,135 +59,81 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit ||
+           (MemInfo::proc_mem_no_allocator_cache() >= 
MemInfo::soft_mem_limit() &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10);
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
+           MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
+        while (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _load_soft_mem_limit ||
+                (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {

Review Comment:
   warning: 10 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
                   (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {
                                                               ^
   ```
   



##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,135 +59,81 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit ||
+           (MemInfo::proc_mem_no_allocator_cache() >= 
MemInfo::soft_mem_limit() &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10);
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
+           MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
+        while (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _load_soft_mem_limit ||
+                (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {
+                _flush_active_memtables();
             }
+            _soft_limit_end_cond.wait(l);
         }
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!mem_heap.empty()) {
-            WriterMemItem mem_item = mem_heap.top();
-            mem_heap.pop();
-            auto writer = mem_item.writer.lock();
-            if (!writer) {
-                continue;
-            }
-            int64_t mem_size = mem_item.mem_size;
-            writers_to_reduce_mem.emplace_back(writer, mem_size);
-            st = writer->flush_async();
-            if (!st.ok()) {
-                auto err_msg = fmt::format(
-                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                        "tablet_id={}, err={}",
-                        writer->tablet_id(), st.to_string());
-                LOG(WARNING) << err_msg;
-                static_cast<void>(writer->cancel_with_status(st));
-            }
-            mem_consumption_in_picked_writer += mem_size;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-        }
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
-        }
+        timer.stop();
+        int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
+        LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
+    }
+}
 
-        std::ostringstream oss;
-        oss << "reducing memory of " << writers_to_reduce_mem.size()
-            << " memtable writers (total mem: "
-            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
-            << ", max mem: " << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
-            << ", min mem:" << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
-            << "), ";
-        if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
-            oss << "because total load mem consumption "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << 
" has exceeded";
-            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
-                _should_wait_flush = true;
-                reducing_mem_on_hard_limit = true;
-                oss << " hard limit: " << 
PrettyPrinter::print_bytes(_load_hard_mem_limit);
-            } else {
-                _soft_reduce_mem_in_progress = true;
-                oss << " soft limit: " << 
PrettyPrinter::print_bytes(_load_soft_mem_limit);
-            }
+void MemTableMemoryLimiter::_flush_active_memtables() {
+    auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { return 
lhs.mem_size < rhs.mem_size; };
+    std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
+
+    _active_mem_usage = 0;
+    for (auto it = _writers.begin(); it != _writers.end();) {
+        if (auto writer = it->lock()) {
+            int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
+            _active_mem_usage += active_memtable_mem;
+            mem_heap.emplace(writer, active_memtable_mem);
+            ++it;
         } else {
-            _should_wait_flush = true;
-            reducing_mem_on_hard_limit = true;
-            oss << "because proc_mem_no_allocator_cache consumption "
-                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
-                << ", has exceeded process soft limit "
-                << PrettyPrinter::print_bytes(process_soft_mem_limit)
-                << ", total load mem consumption: "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
-                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
+            *it = std::move(_writers.back());
+            _writers.pop_back();
         }
-        LOG(INFO) << oss.str();
     }
-
-    // wait all writers flush without lock
-    for (auto item : writers_to_reduce_mem) {
-        VLOG_NOTICE << "reducing memory, wait flush mem_size: "
-                    << PrettyPrinter::print_bytes(item.mem_size);
-        auto writer = item.writer.lock();
+    int64_t mem_to_flush = _hard_limit_reached()
+                                   ? _active_mem_usage - _load_hard_mem_limit 
/ 11
+                                   : _active_mem_usage - _load_soft_mem_limit 
/ 5 * 4;

Review Comment:
   warning: 5 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
                                      : _active_mem_usage - 
_load_soft_mem_limit / 5 * 4;
                                                                                
   ^
   ```
   



##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,135 +59,81 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit ||
+           (MemInfo::proc_mem_no_allocator_cache() >= 
MemInfo::soft_mem_limit() &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10);
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
+           MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
+        while (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _load_soft_mem_limit ||
+                (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {
+                _flush_active_memtables();
             }
+            _soft_limit_end_cond.wait(l);
         }
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!mem_heap.empty()) {
-            WriterMemItem mem_item = mem_heap.top();
-            mem_heap.pop();
-            auto writer = mem_item.writer.lock();
-            if (!writer) {
-                continue;
-            }
-            int64_t mem_size = mem_item.mem_size;
-            writers_to_reduce_mem.emplace_back(writer, mem_size);
-            st = writer->flush_async();
-            if (!st.ok()) {
-                auto err_msg = fmt::format(
-                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                        "tablet_id={}, err={}",
-                        writer->tablet_id(), st.to_string());
-                LOG(WARNING) << err_msg;
-                static_cast<void>(writer->cancel_with_status(st));
-            }
-            mem_consumption_in_picked_writer += mem_size;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-        }
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
-        }
+        timer.stop();
+        int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
+        LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
+    }
+}
 
-        std::ostringstream oss;
-        oss << "reducing memory of " << writers_to_reduce_mem.size()
-            << " memtable writers (total mem: "
-            << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer)
-            << ", max mem: " << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size)
-            << ", min mem:" << 
PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size)
-            << "), ";
-        if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
-            oss << "because total load mem consumption "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << 
" has exceeded";
-            if (_mem_tracker->consumption() > _load_hard_mem_limit) {
-                _should_wait_flush = true;
-                reducing_mem_on_hard_limit = true;
-                oss << " hard limit: " << 
PrettyPrinter::print_bytes(_load_hard_mem_limit);
-            } else {
-                _soft_reduce_mem_in_progress = true;
-                oss << " soft limit: " << 
PrettyPrinter::print_bytes(_load_soft_mem_limit);
-            }
+void MemTableMemoryLimiter::_flush_active_memtables() {
+    auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { return 
lhs.mem_size < rhs.mem_size; };
+    std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
+
+    _active_mem_usage = 0;
+    for (auto it = _writers.begin(); it != _writers.end();) {
+        if (auto writer = it->lock()) {
+            int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
+            _active_mem_usage += active_memtable_mem;
+            mem_heap.emplace(writer, active_memtable_mem);
+            ++it;
         } else {
-            _should_wait_flush = true;
-            reducing_mem_on_hard_limit = true;
-            oss << "because proc_mem_no_allocator_cache consumption "
-                << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache)
-                << ", has exceeded process soft limit "
-                << PrettyPrinter::print_bytes(process_soft_mem_limit)
-                << ", total load mem consumption: "
-                << PrettyPrinter::print_bytes(_mem_tracker->consumption())
-                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
+            *it = std::move(_writers.back());
+            _writers.pop_back();
         }
-        LOG(INFO) << oss.str();
     }
-
-    // wait all writers flush without lock
-    for (auto item : writers_to_reduce_mem) {
-        VLOG_NOTICE << "reducing memory, wait flush mem_size: "
-                    << PrettyPrinter::print_bytes(item.mem_size);
-        auto writer = item.writer.lock();
+    int64_t mem_to_flush = _hard_limit_reached()
+                                   ? _active_mem_usage - _load_hard_mem_limit 
/ 11

Review Comment:
   warning: 11 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
                                      ? _active_mem_usage - 
_load_hard_mem_limit / 11
                                                                                
   ^
   ```
   



##########
be/src/olap/memtable_memory_limiter.cpp:
##########
@@ -59,135 +59,81 @@ void 
MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer
     _writers.push_back(writer);
 }
 
+bool MemTableMemoryLimiter::_soft_limit_reached() {
+    return _mem_tracker->consumption() >= _load_soft_mem_limit ||
+           (MemInfo::proc_mem_no_allocator_cache() >= 
MemInfo::soft_mem_limit() &&
+            _mem_tracker->consumption() >= _load_hard_mem_limit / 10);
+}
+
+bool MemTableMemoryLimiter::_hard_limit_reached() {
+    return _mem_tracker->consumption() >= _load_hard_mem_limit ||
+           MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
-    // Record current memory status.
-    int64_t process_soft_mem_limit = MemInfo::soft_mem_limit();
-    int64_t proc_mem_no_allocator_cache = 
MemInfo::proc_mem_no_allocator_cache();
-#ifndef BE_TEST
-    // If process memory is almost full but data load don't consume more than 
5% (50% * 10%) of
-    // total memory, we don't need to flush memtable.
-    bool reduce_on_process_soft_mem_limit =
-            proc_mem_no_allocator_cache >= process_soft_mem_limit &&
-            _mem_tracker->consumption() >= _load_hard_mem_limit / 10;
-    if (_mem_tracker->consumption() < _load_soft_mem_limit && 
!reduce_on_process_soft_mem_limit) {
+    if (!_soft_limit_reached()) {
         return;
     }
-#endif
-    // Indicate whether current thread is reducing mem on hard limit.
-    bool reducing_mem_on_hard_limit = false;
-    Status st;
-    std::vector<WriterMemItem> writers_to_reduce_mem;
     {
         MonotonicStopWatch timer;
         timer.start();
         std::unique_lock<std::mutex> l(_lock);
-        while (_should_wait_flush) {
-            _wait_flush_cond.wait(l);
-        }
-        LOG(INFO) << "Reached the one tenth of load hard limit " << 
_load_hard_mem_limit / 10
-                  << "and process remaining allocator cache " << 
proc_mem_no_allocator_cache
-                  << "reached process soft memory limit " << 
process_soft_mem_limit
-                  << ", waited for flush, time_ns:" << timer.elapsed_time();
-#ifndef BE_TEST
-        bool hard_limit_reached = _mem_tracker->consumption() >= 
_load_hard_mem_limit ||
-                                  proc_mem_no_allocator_cache >= 
process_soft_mem_limit;
-        // Some other thread is flushing data, and not reached hard limit now,
-        // we don't need to handle mem limit in current thread.
-        if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
-            return;
-        }
-#endif
-
-        auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) {
-            return lhs.mem_size < rhs.mem_size;
-        };
-        std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
-
-        for (auto it = _writers.begin(); it != _writers.end();) {
-            if (auto writer = it->lock()) {
-                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-                mem_heap.emplace(writer, active_memtable_mem);
-                ++it;
-            } else {
-                *it = std::move(_writers.back());
-                _writers.pop_back();
+        while (_soft_limit_reached()) {
+            LOG(INFO) << "reached memtable memory soft limit"
+                      << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                      << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                      << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+            if (_active_mem_usage >= _load_soft_mem_limit ||
+                (_active_mem_usage > _load_hard_mem_limit / 10 && 
_hard_limit_reached())) {
+                _flush_active_memtables();
             }
+            _soft_limit_end_cond.wait(l);
         }
-        int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
-        int64_t mem_consumption_in_picked_writer = 0;
-        while (!mem_heap.empty()) {
-            WriterMemItem mem_item = mem_heap.top();
-            mem_heap.pop();
-            auto writer = mem_item.writer.lock();
-            if (!writer) {
-                continue;
-            }
-            int64_t mem_size = mem_item.mem_size;
-            writers_to_reduce_mem.emplace_back(writer, mem_size);
-            st = writer->flush_async();
-            if (!st.ok()) {
-                auto err_msg = fmt::format(
-                        "tablet writer failed to reduce mem consumption by 
flushing memtable, "
-                        "tablet_id={}, err={}",
-                        writer->tablet_id(), st.to_string());
-                LOG(WARNING) << err_msg;
-                static_cast<void>(writer->cancel_with_status(st));
-            }
-            mem_consumption_in_picked_writer += mem_size;
-            if (mem_consumption_in_picked_writer > mem_to_flushed) {
-                break;
-            }
-        }
-        if (writers_to_reduce_mem.empty()) {
-            // should not happen, add log to observe
-            LOG(WARNING) << "failed to find suitable writers to reduce memory"
-                         << " when total load mem limit exceed";
-            return;
-        }
+        timer.stop();
+        int64_t time_ms = timer.elapsed_time() / 1000 / 1000;

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
           int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
                                                    ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to