This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d8ec53c83f [enhancement](load) avoid duplicate reduce on same TabletsChannel #12975 d8ec53c83f is described below commit d8ec53c83f4726abc15808b6c58e6ef3a1080d28 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Tue Sep 27 22:03:08 2022 +0800 [enhancement](load) avoid duplicate reduce on same TabletsChannel #12975 In the policy changed by PR #12716, when reaching the hard limit, there might be multiple threads can pick same LoadChannel and call reduce_mem_usage on same TabletsChannel. Although there's a lock and condition variable can prevent multiple threads to reduce mem usage concurrently, but they still can do same reduce-work on that channel multiple times one by one, even it's just reduced. --- be/src/runtime/load_channel_mgr.cpp | 9 --------- be/src/runtime/load_channel_mgr.h | 9 ++++++--- be/src/runtime/tablets_channel.cpp | 22 ++++++++++++++++++++-- be/src/runtime/tablets_channel.h | 2 +- 4 files changed, 27 insertions(+), 15 deletions(-) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 2405a28ae7..e6f908f69c 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -112,15 +112,6 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { return Status::OK(); } -void LoadChannelMgr::_try_to_wait_flushing() { - std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - LOG(INFO) << "Reached the load channel manager mem limit " << _mem_tracker->limit() - << ", waiting for flush"; - _wait_flush_cond.wait(l); - } -} - static void dummy_deleter(const CacheKey& key, void* value) {} void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index fb4a5d3592..686322b076 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -68,7 +68,6 @@ private: // If yes, it will pick a load channel to try to reduce memory consumption. template <typename TabletWriterAddResult> Status _handle_mem_exceed_limit(TabletWriterAddResult* response); - void _try_to_wait_flushing(); Status _start_bg_worker(); @@ -152,7 +151,6 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, template <typename TabletWriterAddResult> Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { - _try_to_wait_flushing(); // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); DCHECK(_process_soft_mem_limit > 0); @@ -163,7 +161,12 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) // Pick load channel to reduce memory. std::shared_ptr<LoadChannel> channel; { - std::lock_guard<std::mutex> l(_lock); + std::unique_lock<std::mutex> l(_lock); + while (_should_wait_flush) { + LOG(INFO) << "Reached the load hard limit " << _mem_tracker->limit() + << ", waiting for flush"; + _wait_flush_cond.wait(l); + } // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. if (_reduce_memory_channel != nullptr && !_mem_tracker->limit_exceeded() && diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 7fb8322b12..fa72d23ad0 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -196,7 +196,12 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, template <typename TabletWriterAddResult> Status TabletsChannel::reduce_mem_usage(TabletWriterAddResult* response) { - _try_to_wait_flushing(); + if (_try_to_wait_flushing()) { + // `_try_to_wait_flushing()` returns true means other thread already + // reduced the mem usage, and current thread do not need to reduce again. + return Status::OK(); + } + std::vector<DeltaWriter*> writers_to_flush; { std::lock_guard<std::mutex> l(_lock); @@ -335,11 +340,24 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request return Status::OK(); } -void TabletsChannel::_try_to_wait_flushing() { +bool TabletsChannel::_try_to_wait_flushing() { + bool duplicate_work = false; std::unique_lock<std::mutex> l(_lock); + // NOTE: we call `reduce_mem_usage()` because we think it's necessary + // to reduce it's memory and should not write more data into this + // tablets channel. If there's already some other thead doing the + // reduce-memory work, the only choice for current thread is to wait + // here. + // If current thread do not wait, it has two options: + // 1. continue to write data to current channel. + // 2. pick another tablets channel to flush + // The first choice might cause OOM, the second choice might pick a + // channel that is not big enough. while (_reducing_mem_usage) { + duplicate_work = true; _reduce_memory_cond.wait(l); } + return duplicate_work; } Status TabletsChannel::cancel() { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index e11e314e6d..cf7e067b32 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -105,7 +105,7 @@ private: // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); - void _try_to_wait_flushing(); + bool _try_to_wait_flushing(); // deal with DeltaWriter close_wait(), add tablet to list for return. void _close_wait(DeltaWriter* writer, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org