This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ec53c83f [enhancement](load) avoid duplicate reduce on same 
TabletsChannel #12975
d8ec53c83f is described below

commit d8ec53c83f4726abc15808b6c58e6ef3a1080d28
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Tue Sep 27 22:03:08 2022 +0800

    [enhancement](load) avoid duplicate reduce on same TabletsChannel #12975
    
    In the policy changed by PR #12716, when reaching the hard limit, there 
might be multiple threads can pick same LoadChannel and call reduce_mem_usage 
on same TabletsChannel. Although there's a lock and condition variable can 
prevent multiple threads to reduce mem usage concurrently, but they still can 
do same reduce-work on that channel multiple times one by one, even it's just 
reduced.
---
 be/src/runtime/load_channel_mgr.cpp |  9 ---------
 be/src/runtime/load_channel_mgr.h   |  9 ++++++---
 be/src/runtime/tablets_channel.cpp  | 22 ++++++++++++++++++++--
 be/src/runtime/tablets_channel.h    |  2 +-
 4 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 2405a28ae7..e6f908f69c 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -112,15 +112,6 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
     return Status::OK();
 }
 
-void LoadChannelMgr::_try_to_wait_flushing() {
-    std::unique_lock<std::mutex> l(_lock);
-    while (_should_wait_flush) {
-        LOG(INFO) << "Reached the load channel manager mem limit " << 
_mem_tracker->limit()
-                  << ", waiting for flush";
-        _wait_flush_cond.wait(l);
-    }
-}
-
 static void dummy_deleter(const CacheKey& key, void* value) {}
 
 void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) {
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index fb4a5d3592..686322b076 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -68,7 +68,6 @@ private:
     // If yes, it will pick a load channel to try to reduce memory consumption.
     template <typename TabletWriterAddResult>
     Status _handle_mem_exceed_limit(TabletWriterAddResult* response);
-    void _try_to_wait_flushing();
 
     Status _start_bg_worker();
 
@@ -152,7 +151,6 @@ Status LoadChannelMgr::add_batch(const 
TabletWriterAddRequest& request,
 
 template <typename TabletWriterAddResult>
 Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* 
response) {
-    _try_to_wait_flushing();
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
     DCHECK(_process_soft_mem_limit > 0);
@@ -163,7 +161,12 @@ Status 
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
     // Pick load channel to reduce memory.
     std::shared_ptr<LoadChannel> channel;
     {
-        std::lock_guard<std::mutex> l(_lock);
+        std::unique_lock<std::mutex> l(_lock);
+        while (_should_wait_flush) {
+            LOG(INFO) << "Reached the load hard limit " << 
_mem_tracker->limit()
+                      << ", waiting for flush";
+            _wait_flush_cond.wait(l);
+        }
         // Some other thread is flushing data, and not reached hard limit now,
         // we don't need to handle mem limit in current thread.
         if (_reduce_memory_channel != nullptr && 
!_mem_tracker->limit_exceeded() &&
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 7fb8322b12..fa72d23ad0 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -196,7 +196,12 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
 
 template <typename TabletWriterAddResult>
 Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) {
-    _try_to_wait_flushing();
+    if (_try_to_wait_flushing()) {
+        // `_try_to_wait_flushing()` returns true means other thread already
+        // reduced the mem usage, and current thread do not need to reduce 
again.
+        return Status::OK();
+    }
+
     std::vector<DeltaWriter*> writers_to_flush;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -335,11 +340,24 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
     return Status::OK();
 }
 
-void TabletsChannel::_try_to_wait_flushing() {
+bool TabletsChannel::_try_to_wait_flushing() {
+    bool duplicate_work = false;
     std::unique_lock<std::mutex> l(_lock);
+    // NOTE: we call `reduce_mem_usage()` because we think it's necessary
+    // to reduce it's memory and should not write more data into this
+    // tablets channel. If there's already some other thead doing the
+    // reduce-memory work, the only choice for current thread is to wait
+    // here.
+    // If current thread do not wait, it has two options:
+    // 1. continue to write data to current channel.
+    // 2. pick another tablets channel to flush
+    // The first choice might cause OOM, the second choice might pick a
+    // channel that is not big enough.
     while (_reducing_mem_usage) {
+        duplicate_work = true;
         _reduce_memory_cond.wait(l);
     }
+    return duplicate_work;
 }
 
 Status TabletsChannel::cancel() {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index e11e314e6d..cf7e067b32 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -105,7 +105,7 @@ private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-    void _try_to_wait_flushing();
+    bool _try_to_wait_flushing();
 
     // deal with DeltaWriter close_wait(), add tablet to list for return.
     void _close_wait(DeltaWriter* writer,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to