This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4380f1ec54 [Enhancement](load) reduce memory by memory size of global delta writer (#14491) 4380f1ec54 is described below commit 4380f1ec54935cac3a8b271e061942eebe8ffd9f Author: Xin Liao <liaoxin...@126.com> AuthorDate: Tue Jan 3 20:05:21 2023 +0800 [Enhancement](load) reduce memory by memory size of global delta writer (#14491) --- be/src/olap/delta_writer.cpp | 21 ++-- be/src/runtime/load_channel.cpp | 31 ------ be/src/runtime/load_channel.h | 42 ++++--- be/src/runtime/load_channel_mgr.cpp | 169 +++++++++++++++------------- be/src/runtime/load_channel_mgr.h | 30 ++--- be/src/runtime/tablets_channel.cpp | 215 ++++++++++++------------------------ be/src/runtime/tablets_channel.h | 34 +++--- 7 files changed, 228 insertions(+), 314 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 0d35a4b49c..22b7f568d1 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,14 +17,11 @@ #include "olap/delta_writer.h" -#include "olap/base_compaction.h" -#include "olap/cumulative_compaction.h" #include "olap/data_dir.h" #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/schema.h" -#include "olap/schema_change.h" #include "olap/storage_engine.h" #include "runtime/load_channel_mgr.h" #include "runtime/row_batch.h" @@ -223,14 +220,16 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { } Status DeltaWriter::wait_flush() { - std::lock_guard<std::mutex> l(_lock); - if (!_is_init) { - // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason - // as described in flush_memtable_and_wait() - return Status::OK(); - } - if (_is_cancelled) { - return _cancel_status; + { + std::lock_guard<std::mutex> l(_lock); + if (!_is_init) { + // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason + // as described in flush_memtable_and_wait() + return Status::OK(); + } + if (_is_cancelled) { + return _cancel_status; + } } RETURN_NOT_OK(_flush_token->wait()); return Status::OK(); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 886747b5cd..de3d8d3365 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -90,18 +90,6 @@ Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe return Status::OK(); } -// lock should be held when calling this method -bool LoadChannel::_find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel) { - int64_t max_consume = 0; - for (auto& it : _tablets_channels) { - if (it.second->mem_consumption() > max_consume) { - max_consume = it.second->mem_consumption(); - *channel = it.second; - } - } - return max_consume > 0; -} - bool LoadChannel::is_finished() { if (!_opened) { return false; @@ -118,23 +106,4 @@ Status LoadChannel::cancel() { return Status::OK(); } -void LoadChannel::handle_mem_exceed_limit() { - bool found = false; - std::shared_ptr<TabletsChannel> channel; - { - // lock so that only one thread can check mem limit - std::lock_guard<SpinLock> l(_tablets_channels_lock); - found = _find_largest_consumption_channel(&channel); - } - // Release lock so that other threads can still call add_batch concurrently. - if (found) { - DCHECK(channel != nullptr); - channel->reduce_mem_usage(); - } else { - // should not happen, add log to observe - LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. " - << "load_id=" << _load_id; - } -} - } // namespace doris diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index d3463c3db5..2fad48c79d 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,18 +17,17 @@ #pragma once +#include <functional> +#include <map> #include <mutex> #include <ostream> #include <unordered_map> #include <unordered_set> #include "common/status.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/memory/mem_tracker.h" #include "runtime/tablets_channel.h" -#include "runtime/thread_context.h" #include "util/uid_util.h" namespace doris { @@ -59,12 +58,6 @@ public: const UniqueId& load_id() const { return _load_id; } - // check if this load channel mem consumption exceeds limit. - // If yes, it will pick a tablets channel to try to reduce memory consumption. - // The method will not return until the chosen tablet channels finished memtable - // flush. - void handle_mem_exceed_limit(); - int64_t mem_consumption() { int64_t mem_usage = 0; { @@ -77,10 +70,37 @@ public: return mem_usage; } + void get_writers_mem_consumption_snapshot( + std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>* + writers_mem_snap) { + std::lock_guard<SpinLock> l(_tablets_channels_lock); + for (auto& it : _tablets_channels) { + std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem; + it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem); + writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem)); + } + } + int64_t timeout() const { return _timeout_s; } bool is_high_priority() const { return _is_high_priority; } + void flush_memtable_async(int64_t index_id, int64_t tablet_id) { + std::lock_guard<std::mutex> l(_lock); + auto it = _tablets_channels.find(index_id); + if (it != _tablets_channels.end()) { + it->second->flush_memtable_async(tablet_id); + } + } + + void wait_flush(int64_t index_id, int64_t tablet_id) { + std::lock_guard<std::mutex> l(_lock); + auto it = _tablets_channels.find(index_id); + if (it != _tablets_channels.end()) { + it->second->wait_flush(tablet_id); + } + } + protected: Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished, const int64_t index_id); @@ -107,10 +127,6 @@ protected: } private: - // when mem consumption exceeds limit, should call this method to find the channel - // that consumes the largest memory(, and then we can reduce its memory usage). - bool _find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel); - UniqueId _load_id; // Tracks the total memory consumed by current load job on this BE std::unique_ptr<MemTracker> _mem_tracker; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 15e9ea0c9e..fe4fe4250b 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -17,11 +17,15 @@ #include "runtime/load_channel_mgr.h" -#include "gutil/strings/substitute.h" +#include <functional> +#include <map> +#include <memory> +#include <queue> +#include <tuple> +#include <vector> + #include "runtime/load_channel.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" -#include "service/backend_options.h" #include "util/doris_metrics.h" #include "util/stopwatch.hpp" #include "util/time.h" @@ -71,12 +75,6 @@ LoadChannelMgr::~LoadChannelMgr() { Status LoadChannelMgr::init(int64_t process_mem_limit) { _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; - // If a load channel's memory consumption is no more than 10% of the hard limit, it's not - // worth to reduce memory on it. Since we only reduce 1/3 memory for one load channel, - // for a channel consume 10% of hard limit, we can only release about 3% memory each time, - // it's not quite helpfull to reduce memory pressure. - // In this case we need to pick multiple load channels to reduce memory more effectively. - _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1; _mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr"); _mem_tracker_set = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, "LoadChannelMgrTrackerSet"); @@ -221,13 +219,16 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); int64_t process_mem_limit = MemInfo::soft_mem_limit(); + int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); if (_mem_tracker->consumption() < _load_soft_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + proc_mem_no_allocator_cache < process_mem_limit) { return; } // Indicate whether current thread is reducing mem on hard limit. bool reducing_mem_on_hard_limit = false; - std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem; + // tuple<LoadChannel, index_id, tablet_id, mem_size> + std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t, int64_t>> + writers_to_reduce_mem; { std::unique_lock<std::mutex> l(_lock); while (_should_wait_flush) { @@ -236,102 +237,117 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { _wait_flush_cond.wait(l); } bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; + proc_mem_no_allocator_cache >= process_mem_limit; // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. if (_soft_reduce_mem_in_progress && !hard_limit_reached) { return; } - // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory - // due to soft limit, and we reached hard limit now, current thread may pick some - // duplicate channels and trigger duplicate reducing memory process. - // But the load channel's reduce memory process is thread safe, only 1 thread can - // reduce memory at the same time, other threads will wait on a condition variable, - // after the reduce-memory work finished, all threads will return. - using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>; - std::vector<ChannelMemPair> candidate_channels; - int64_t total_consume = 0; + // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>> + using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t, + std::multimap<int64_t, int64_t, std::greater<int64_t>>>; + std::vector<WritersMem> all_writers_mem; + + // tuple<current iterator in multimap, end iterator in multimap, pos in all_writers_mem> + using WriterMemItem = + std::tuple<std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator, + std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator, + size_t>; + auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { + return std::get<0>(lhs)->first < std::get<0>(rhs)->first; + }; + std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> + tablets_mem_heap(cmp); + for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { // do not select high priority channel to reduce memory // to avoid blocking them. continue; } - int64_t mem = kv.second->mem_consumption(); - // save the mem consumption, since the calculation might be expensive. - candidate_channels.push_back(std::make_pair(kv.second, mem)); - total_consume += mem; + std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>> + writers_mem_snap; + kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap); + for (auto item : writers_mem_snap) { + // multimap is empty + if (item.second.empty()) { + continue; + } + all_writers_mem.emplace_back(kv.second, item.first, std::move(item.second)); + size_t pos = all_writers_mem.size() - 1; + tablets_mem_heap.emplace(std::get<2>(all_writers_mem[pos]).begin(), + std::get<2>(all_writers_mem[pos]).end(), pos); + } } - if (candidate_channels.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "All load channels are high priority, failed to find suitable" - << "channels to reduce memory when total load mem limit exceed"; - return; + // reduce 1/10 memory every time + int64_t mem_to_flushed = _mem_tracker->consumption() / 10; + int64_t mem_consumption_in_picked_writer = 0; + while (!tablets_mem_heap.empty()) { + WriterMemItem tablet_mem_item = tablets_mem_heap.top(); + size_t pos = std::get<2>(tablet_mem_item); + auto load_channel = std::get<0>(all_writers_mem[pos]); + int64_t index_id = std::get<1>(all_writers_mem[pos]); + int64_t tablet_id = std::get<0>(tablet_mem_item)->second; + int64_t mem_size = std::get<0>(tablet_mem_item)->first; + writers_to_reduce_mem.emplace_back(load_channel, index_id, tablet_id, mem_size); + load_channel->flush_memtable_async(index_id, tablet_id); + mem_consumption_in_picked_writer += std::get<0>(tablet_mem_item)->first; + if (mem_consumption_in_picked_writer > mem_to_flushed) { + break; + } + tablets_mem_heap.pop(); + if (std::get<0>(tablet_mem_item)++ != std::get<1>(tablet_mem_item)) { + tablets_mem_heap.push(tablet_mem_item); + } } - // sort all load channels, try to find the largest one. - std::sort(candidate_channels.begin(), candidate_channels.end(), - [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { - return lhs.second > rhs.second; - }); - - int64_t mem_consumption_in_picked_channel = 0; - auto largest_channel = *candidate_channels.begin(); - // If some load-channel is big enough, we can reduce it only, try our best to avoid - // reducing small load channels. - if (_load_channel_min_mem_to_reduce > 0 && - largest_channel.second > _load_channel_min_mem_to_reduce) { - // Pick 1 load channel to reduce memory. - channels_to_reduce_mem.push_back(largest_channel.first); - mem_consumption_in_picked_channel = largest_channel.second; - } else { - // Pick multiple channels to reduce memory. - int64_t mem_to_flushed = total_consume / 3; - for (auto ch : candidate_channels) { - channels_to_reduce_mem.push_back(ch.first); - mem_consumption_in_picked_channel += ch.second; - if (mem_consumption_in_picked_channel >= mem_to_flushed) { - break; - } - } + if (writers_to_reduce_mem.empty()) { + // should not happen, add log to observe + LOG(WARNING) << "failed to find suitable writers to reduce memory" + << " when total load mem limit exceed"; + return; } std::ostringstream oss; - if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - oss << "reducing memory of " << channels_to_reduce_mem.size() - << " load channels (total mem consumption: " << mem_consumption_in_picked_channel - << " bytes), because total load mem consumption " - << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) - << " has exceeded"; + oss << "reducing memory of " << writers_to_reduce_mem.size() + << " delta writers (total mem: " + << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: " + << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front())) + << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back())) + << "), "; + if (proc_mem_no_allocator_cache < process_mem_limit) { + oss << "because total load mem consumption " + << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; if (_mem_tracker->consumption() > _load_hard_mem_limit) { _should_wait_flush = true; reducing_mem_on_hard_limit = true; - oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); + oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit); } else { _soft_reduce_mem_in_progress = true; - oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); + oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit); } } else { _should_wait_flush = true; reducing_mem_on_hard_limit = true; - oss << "reducing memory of " << channels_to_reduce_mem.size() - << " load channels (total mem consumption: " << mem_consumption_in_picked_channel - << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " - << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) - << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); + oss << "because proc_mem_no_allocator_cache consumption " + << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache) + << ", has exceeded process limit " << PrettyPrinter::print_bytes(process_mem_limit) + << ", total load mem consumption: " + << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", vm_rss: " << PerfCounters::get_vm_rss_str() + << ", tc/jemalloc allocator cache: " << MemInfo::allocator_cache_mem_str(); } LOG(INFO) << oss.str(); } - for (auto ch : channels_to_reduce_mem) { - uint64_t begin = GetCurrentTimeMicros(); - int64_t mem_usage = ch->mem_consumption(); - ch->handle_mem_exceed_limit(); - LOG(INFO) << "reduced memory of " << *ch << ", cost " - << (GetCurrentTimeMicros() - begin) / 1000 - << " ms, released memory: " << mem_usage - ch->mem_consumption() << " bytes"; + // wait all writers flush without lock + for (auto item : writers_to_reduce_mem) { + VLOG_NOTICE << "reducing memory, wait flush load_id: " << std::get<0>(item)->load_id() + << ", index_id: " << std::get<1>(item) << ", tablet_id: " << std::get<2>(item) + << ", mem_size: " << PrettyPrinter::print_bytes(std::get<3>(item)); + std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item)); } { @@ -345,8 +361,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { if (_soft_reduce_mem_in_progress) { _soft_reduce_mem_in_progress = false; } + // refresh mem tacker to avoid duplicate reduce + _refresh_mem_tracker_without_lock(); } - return; } } // namespace doris diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 9bf604d87e..967617e9bc 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -24,14 +24,10 @@ #include <unordered_map> #include "common/status.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gutil/ref_counted.h" #include "olap/lru_cache.h" #include "runtime/load_channel.h" -#include "runtime/tablets_channel.h" -#include "runtime/thread_context.h" #include "util/countdown_latch.h" #include "util/thread.h" #include "util/uid_util.h" @@ -59,14 +55,8 @@ public: Status cancel(const PTabletWriterCancelRequest& request); void refresh_mem_tracker() { - int64_t mem_usage = 0; - { - std::lock_guard<std::mutex> l(_lock); - for (auto& kv : _load_channels) { - mem_usage += kv.second->mem_consumption(); - } - } - _mem_tracker->set_consumption(mem_usage); + std::lock_guard<std::mutex> l(_lock); + _refresh_mem_tracker_without_lock(); } MemTrackerLimiter* mem_tracker_set() { return _mem_tracker_set.get(); } @@ -82,6 +72,15 @@ private: Status _start_bg_worker(); + // lock should be held when calling this method + void _refresh_mem_tracker_without_lock() { + int64_t mem_usage = 0; + for (auto& kv : _load_channels) { + mem_usage += kv.second->mem_consumption(); + } + _mem_tracker->set_consumption(mem_usage); + } + protected: // lock protect the load channel map std::mutex _lock; @@ -95,13 +94,6 @@ protected: std::unique_ptr<MemTrackerLimiter> _mem_tracker_set; int64_t _load_hard_mem_limit = -1; int64_t _load_soft_mem_limit = -1; - // By default, we try to reduce memory on the load channel with largest mem consumption, - // but if there are lots of small load channel, even the largest one consumes very - // small memory, in this case we need to pick multiple load channels to reduce memory - // more effectively. - // `_load_channel_min_mem_to_reduce` is used to determine whether the largest load channel's - // memory consumption is big enough. - int64_t _load_channel_min_mem_to_reduce = -1; bool _soft_reduce_mem_in_progress = false; // If hard limit reached, one thread will trigger load channel flush, diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 483fd22edd..089b5ba4be 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -23,10 +23,7 @@ #include "olap/storage_engine.h" #include "runtime/load_channel.h" #include "runtime/row_batch.h" -#include "runtime/thread_context.h" -#include "runtime/tuple_row.h" #include "util/doris_metrics.h" -#include "util/time.h" namespace doris { @@ -208,136 +205,16 @@ int64_t TabletsChannel::mem_consumption() { int64_t mem_usage = 0; { std::lock_guard<SpinLock> l(_tablet_writers_lock); + _mem_consumptions.clear(); for (auto& it : _tablet_writers) { - mem_usage += it.second->mem_consumption(); + int64_t writer_mem = it.second->mem_consumption(); + mem_usage += writer_mem; + _mem_consumptions.emplace(writer_mem, it.first); } } return mem_usage; } -void TabletsChannel::reduce_mem_usage() { - if (_try_to_wait_flushing()) { - // `_try_to_wait_flushing()` returns true means other thread already - // reduced the mem usage, and current thread do not need to reduce again. - LOG(INFO) << "Duplicate reduce mem usage on TabletsChannel, txn_id: " << _txn_id - << ", index_id: " << _index_id; - return; - } - - std::vector<DeltaWriter*> writers_to_wait_flush; - { - std::lock_guard<std::mutex> l(_lock); - if (_state == kFinished) { - // TabletsChannel is closed without LoadChannel's lock, - // therefore it's possible for reduce_mem_usage() to be called right after close() - LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id - << ", index_id: " << _index_id; - return; - } - - // Sort the DeltaWriters by mem consumption in descend order. - std::vector<DeltaWriter*> writers; - for (auto& it : _tablet_writers) { - it.second->save_mem_consumption_snapshot(); - writers.push_back(it.second); - } - int64_t total_memtable_consumption_in_flush = 0; - for (auto writer : writers) { - if (writer->get_memtable_consumption_inflush() > 0) { - writers_to_wait_flush.push_back(writer); - total_memtable_consumption_in_flush += writer->get_memtable_consumption_inflush(); - } - } - std::sort(writers.begin(), writers.end(), - [](const DeltaWriter* lhs, const DeltaWriter* rhs) { - return lhs->get_memtable_consumption_snapshot() > - rhs->get_memtable_consumption_snapshot(); - }); - - // Decide which writes should be flushed to reduce mem consumption. - // The main idea is to flush at least one third of the mem_limit. - // This is mainly to solve the following scenarios. - // Suppose there are N tablets in this TabletsChannel, and the mem limit is M. - // If the data is evenly distributed, when each tablet memory accumulates to M/N, - // the reduce memory operation will be triggered. - // At this time, the value of M/N may be much smaller than the value of `write_buffer_size`. - // If we flush all the tablets at this time, each tablet will generate a lot of small files. - // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered, - // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes. - - int64_t mem_to_flushed = mem_consumption() / 3; - if (total_memtable_consumption_in_flush < mem_to_flushed) { - mem_to_flushed -= total_memtable_consumption_in_flush; - int counter = 0; - int64_t sum = 0; - for (auto writer : writers) { - if (writer->mem_consumption() <= 0) { - break; - } - ++counter; - sum += writer->mem_consumption(); - if (sum > mem_to_flushed) { - break; - } - } - std::ostringstream ss; - ss << "total size of memtables in flush: " << total_memtable_consumption_in_flush - << " will flush " << counter << " more memtables to reduce memory: " << sum; - if (counter > 0) { - ss << ", the size of smallest memtable to flush is " - << writers[counter - 1]->get_memtable_consumption_snapshot() << " bytes"; - } - LOG(INFO) << ss.str(); - // following loop flush memtable async, we'll do it with _lock - for (int i = 0; i < counter; i++) { - Status st = writers[i]->flush_memtable_and_wait(false); - if (!st.ok()) { - LOG_WARNING( - "tablet writer failed to reduce mem consumption by flushing memtable") - .tag("tablet_id", writers[i]->tablet_id()) - .tag("txn_id", _txn_id) - .error(st); - writers[i]->cancel_with_status(st); - _broken_tablets.insert(writers[i]->tablet_id()); - } - } - for (int i = 0; i < counter; i++) { - if (_broken_tablets.find(writers[i]->tablet_id()) != _broken_tablets.end()) { - // skip broken tablets - continue; - } - writers_to_wait_flush.push_back(writers[i]); - } - _reducing_mem_usage = true; - } else { - LOG(INFO) << "total size of memtables in flush is big enough: " - << total_memtable_consumption_in_flush - << " bytes, will not flush more memtables"; - } - } - - for (auto writer : writers_to_wait_flush) { - Status st = writer->wait_flush(); - if (!st.ok()) { - LOG_WARNING( - "tablet writer failed to reduce mem consumption by waiting flushing memtable") - .tag("tablet_id", writer->tablet_id()) - .tag("txn_id", _txn_id) - .error(st); - writer->cancel_with_status(st); - _broken_tablets.insert(writer->tablet_id()); - } - } - - { - std::lock_guard<std::mutex> l(_lock); - _reducing_mem_usage = false; - _reduce_memory_cond.notify_all(); - } - - return; -} - Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; @@ -387,26 +264,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request return Status::OK(); } -bool TabletsChannel::_try_to_wait_flushing() { - bool duplicate_work = false; - std::unique_lock<std::mutex> l(_lock); - // NOTE: we call `reduce_mem_usage()` because we think it's necessary - // to reduce it's memory and should not write more data into this - // tablets channel. If there's already some other thead doing the - // reduce-memory work, the only choice for current thread is to wait - // here. - // If current thread do not wait, it has two options: - // 1. continue to write data to current channel. - // 2. pick another tablets channel to flush - // The first choice might cause OOM, the second choice might pick a - // channel that is not big enough. - while (_reducing_mem_usage) { - duplicate_work = true; - _reduce_memory_cond.wait(l); - } - return duplicate_work; -} - Status TabletsChannel::cancel() { std::lock_guard<std::mutex> l(_lock); if (_state == kFinished) { @@ -500,6 +357,70 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, return Status::OK(); } +void TabletsChannel::flush_memtable_async(int64_t tablet_id) { + std::lock_guard<std::mutex> l(_lock); + if (_state == kFinished) { + // TabletsChannel is closed without LoadChannel's lock, + // therefore it's possible for reduce_mem_usage() to be called right after close() + LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id + << ", index_id: " << _index_id; + return; + } + + auto iter = _tablet_writers.find(tablet_id); + if (iter == _tablet_writers.end()) { + return; + } + + if (!(_reducing_tablets.insert(tablet_id).second)) { + return; + } + + Status st = iter->second->flush_memtable_and_wait(false); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, txn_id={}, err={}", + tablet_id, _txn_id, st); + LOG(WARNING) << err_msg; + iter->second->cancel_with_status(st); + _broken_tablets.insert(iter->second->tablet_id()); + } +} + +void TabletsChannel::wait_flush(int64_t tablet_id) { + { + std::lock_guard<std::mutex> l(_lock); + if (_state == kFinished) { + // TabletsChannel is closed without LoadChannel's lock, + // therefore it's possible for reduce_mem_usage() to be called right after close() + LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id + << ", index_id: " << _index_id; + return; + } + } + + auto iter = _tablet_writers.find(tablet_id); + if (iter == _tablet_writers.end()) { + return; + } + Status st = iter->second->wait_flush(); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, txn_id={}, err={}", + tablet_id, _txn_id, st); + LOG(WARNING) << err_msg; + iter->second->cancel_with_status(st); + _broken_tablets.insert(iter->second->tablet_id()); + } + + { + std::lock_guard<std::mutex> l(_lock); + _reducing_tablets.erase(tablet_id); + } +} + template Status TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>( PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index c774994cb2..e649852a47 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -18,22 +18,19 @@ #pragma once #include <cstdint> +#include <functional> +#include <map> #include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" -#include "gutil/strings/substitute.h" #include "runtime/descriptors.h" -#include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" #include "util/bitmap.h" -#include "util/countdown_latch.h" -#include "util/priority_thread_pool.hpp" #include "util/uid_util.h" -#include "vec/core/block.h" namespace doris { @@ -87,14 +84,17 @@ public: // no-op when this channel has been closed or cancelled Status cancel(); - // upper application may call this to try to reduce the mem usage of this channel. - // eg. flush the largest memtable immediately. - // return Status::OK if mem is reduced. - // no-op when this channel has been closed or cancelled - void reduce_mem_usage(); - int64_t mem_consumption(); + void get_writers_mem_consumption_snapshot( + std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) { + std::lock_guard<SpinLock> l(_tablet_writers_lock); + *mem_consumptions = _mem_consumptions; + } + + void flush_memtable_async(int64_t tablet_id); + void wait_flush(int64_t tablet_id); + private: template <typename Request> Status _get_current_seq(int64_t& cur_seq, const Request& request); @@ -151,11 +151,7 @@ private: // So that following batch will not handle this tablet anymore. std::unordered_set<int64_t> _broken_tablets; - bool _reducing_mem_usage = false; - // only one thread can reduce memory for one TabletsChannel. - // if some other thread call `reduce_memory_usage` at the same time, - // it will wait on this condition variable. - std::condition_variable _reduce_memory_cond; + std::unordered_set<int64_t> _reducing_tablets; std::unordered_set<int64_t> _partition_ids; @@ -164,6 +160,10 @@ private: bool _is_high_priority = false; bool _write_single_replica = false; + + // mem -> tablet_id + // sort by memory size + std::multimap<int64_t, int64_t, std::greater<int64_t>> _mem_consumptions; }; template <typename Request> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org