This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ee754307bb [refactor](load) refactor memtable flush actively (#21634) ee754307bb is described below commit ee754307bb62f509a929098421039a2d9acae579 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sun Jul 30 21:31:54 2023 +0800 [refactor](load) refactor memtable flush actively (#21634) --- be/src/common/config.cpp | 4 +- be/src/common/config.h | 4 +- be/src/common/daemon.cpp | 18 +- be/src/common/daemon.h | 4 +- be/src/olap/delta_writer.cpp | 6 +- be/src/olap/delta_writer.h | 2 + be/src/olap/memtable.cpp | 5 +- be/src/olap/memtable_memory_limiter.cpp | 222 +++++++++++++++++++++++ be/src/olap/memtable_memory_limiter.h | 71 ++++++++ be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 4 + be/src/runtime/load_channel.cpp | 13 +- be/src/runtime/load_channel.h | 64 ++----- be/src/runtime/load_channel_mgr.cpp | 215 ++-------------------- be/src/runtime/load_channel_mgr.h | 43 ++--- be/src/runtime/memory/mem_tracker_limiter.cpp | 4 +- be/src/runtime/tablets_channel.cpp | 85 --------- be/src/runtime/tablets_channel.h | 9 +- be/src/util/doris_metrics.h | 1 + be/test/olap/memtable_memory_limiter_test.cpp | 182 +++++++++++++++++++ docs/en/docs/admin-manual/config/be-config.md | 5 + docs/zh-CN/docs/admin-manual/config/be-config.md | 5 + 22 files changed, 574 insertions(+), 395 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index bbf7c3c6b0..74af2f754a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -551,8 +551,8 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "100"); // After minor gc, no minor gc during sleep, but full gc is possible. DEFINE_mInt32(memory_gc_sleep_time_ms, "1000"); -// Sleep time in milliseconds between load channel memory refresh iterations -DEFINE_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); +// Sleep time in milliseconds between memtbale flush mgr refresh iterations +DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "100"); // Alignment DEFINE_Int32(memory_max_alignment, "16"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1f196d0904..c6e54d45b7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -592,8 +592,8 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms); // After minor gc, no minor gc during sleep, but full gc is possible. DECLARE_mInt32(memory_gc_sleep_time_ms); -// Sleep time in milliseconds between load channel memory refresh iterations -DECLARE_mInt64(load_channel_memory_refresh_sleep_time_ms); +// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations +DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms); // Alignment DECLARE_Int32(memory_max_alignment); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index cfff81088c..e1d6e97bc1 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -40,12 +40,12 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "olap/memtable_memory_limiter.h" #include "olap/options.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "runtime/block_spill_manager.h" #include "runtime/exec_env.h" -#include "runtime/load_channel_mgr.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group_manager.h" @@ -276,14 +276,14 @@ void Daemon::memory_gc_thread() { } } -void Daemon::load_channel_tracker_refresh_thread() { +void Daemon::memtable_memory_limiter_tracker_refresh_thread() { // Refresh the memory statistics of the load channel tracker more frequently, // which helps to accurately control the memory of LoadChannelMgr. while (!_stop_background_threads_latch.wait_for( - std::chrono::milliseconds(config::load_channel_memory_refresh_sleep_time_ms)) && + std::chrono::milliseconds(config::memtable_mem_tracker_refresh_interval_ms)) && !k_doris_exit) { if (ExecEnv::GetInstance()->initialized()) { - doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker(); + doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker(); } } } @@ -464,9 +464,9 @@ void Daemon::start() { &_memory_gc_thread); CHECK(st.ok()) << st; st = Thread::create( - "Daemon", "load_channel_tracker_refresh_thread", - [this]() { this->load_channel_tracker_refresh_thread(); }, - &_load_channel_tracker_refresh_thread); + "Daemon", "memtable_memory_limiter_tracker_refresh_thread", + [this]() { this->memtable_memory_limiter_tracker_refresh_thread(); }, + &_memtable_memory_limiter_tracker_refresh_thread); CHECK(st.ok()) << st; st = Thread::create( "Daemon", "memory_tracker_profile_refresh_thread", @@ -498,8 +498,8 @@ void Daemon::stop() { if (_memory_gc_thread) { _memory_gc_thread->join(); } - if (_load_channel_tracker_refresh_thread) { - _load_channel_tracker_refresh_thread->join(); + if (_memtable_memory_limiter_tracker_refresh_thread) { + _memtable_memory_limiter_tracker_refresh_thread->join(); } if (_memory_tracker_profile_refresh_thread) { _memory_tracker_profile_refresh_thread->join(); diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 0d840d6452..39a8cd59f3 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -47,7 +47,7 @@ private: void tcmalloc_gc_thread(); void memory_maintenance_thread(); void memory_gc_thread(); - void load_channel_tracker_refresh_thread(); + void memtable_memory_limiter_tracker_refresh_thread(); void memory_tracker_profile_refresh_thread(); void calculate_metrics_thread(); void block_spill_gc_thread(); @@ -56,7 +56,7 @@ private: scoped_refptr<Thread> _tcmalloc_gc_thread; scoped_refptr<Thread> _memory_maintenance_thread; scoped_refptr<Thread> _memory_gc_thread; - scoped_refptr<Thread> _load_channel_tracker_refresh_thread; + scoped_refptr<Thread> _memtable_memory_limiter_tracker_refresh_thread; scoped_refptr<Thread> _memory_tracker_profile_refresh_thread; scoped_refptr<Thread> _calculate_metrics_thread; scoped_refptr<Thread> _block_spill_gc_thread; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 88a7aa3b15..c3fed74c39 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -38,6 +38,7 @@ #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" +#include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer.h" @@ -51,7 +52,6 @@ #include "olap/tablet_meta.h" #include "olap/txn_manager.h" #include "runtime/exec_env.h" -#include "runtime/load_channel_mgr.h" #include "runtime/memory/mem_tracker.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" @@ -334,11 +334,11 @@ void DeltaWriter::_reset_mem_table() { auto mem_table_insert_tracker = std::make_shared<MemTracker>( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()), - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); auto mem_table_flush_tracker = std::make_shared<MemTracker>( fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()), - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); #else auto mem_table_insert_tracker = std::make_shared<MemTracker>( fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index c4dc830af5..e45a8752e4 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -123,6 +123,8 @@ public: int64_t tablet_id() { return _tablet->tablet_id(); } + int64_t txn_id() { return _req.txn_id; } + void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); int64_t total_received_rows() const { return _total_received_rows; } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index dc3d9d8be7..95854c640d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -27,11 +27,12 @@ #include <vector> #include "common/config.h" +#include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/tablet_schema.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/load_channel_mgr.h" +#include "runtime/thread_context.h" #include "tablet_meta.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -61,7 +62,7 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, #ifndef BE_TEST _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id)), - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()); #else _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id))); diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp new file mode 100644 index 0000000000..bec748db6c --- /dev/null +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_memory_limiter.h" + +#include "common/config.h" +#include "olap/delta_writer.h" +#include "util/doris_metrics.h" +#include "util/mem_info.h" +#include "util/metrics.h" + +namespace doris { +DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "", + memtable_memory_limiter_mem_consumption, + Labels({{"type", "load"}})); + +// Calculate the total memory limit of all load tasks on this BE +static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { + if (process_mem_limit == -1) { + // no limit + return -1; + } + int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent; + return process_mem_limit * max_load_memory_percent / 100; +} + +MemTableMemoryLimiter::MemTableMemoryLimiter() {} + +MemTableMemoryLimiter::~MemTableMemoryLimiter() { + DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption); + for (auto writer : _writers) { + if (writer != nullptr) { + delete writer; + writer = nullptr; + } + } + _writers.clear(); +} + +Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { + _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); + _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; + _mem_tracker = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, + "MemTableMemoryLimiter"); + REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, + [this]() { return _mem_tracker->consumption(); }); + return Status::OK(); +} + +void MemTableMemoryLimiter::register_writer(DeltaWriter* writer) { + std::lock_guard<std::mutex> l(_lock); + _writers.insert(writer); +} + +void MemTableMemoryLimiter::deregister_writer(DeltaWriter* writer) { + std::lock_guard<std::mutex> l(_lock); + _writers.erase(writer); +} + +void MemTableMemoryLimiter::handle_memtable_flush() { + // Check the soft limit. + DCHECK(_load_soft_mem_limit > 0); + // Record current memory status. + int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); + int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); +#ifndef BE_TEST + // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of + // total memory, we don't need to flush memtable. + bool reduce_on_process_soft_mem_limit = + proc_mem_no_allocator_cache >= process_soft_mem_limit && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10; + if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + return; + } +#endif + // Indicate whether current thread is reducing mem on hard limit. + bool reducing_mem_on_hard_limit = false; + Status st; + std::vector<WriterMemItem> writers_to_reduce_mem; + { + MonotonicStopWatch timer; + timer.start(); + std::unique_lock<std::mutex> l(_lock); + while (_should_wait_flush) { + _wait_flush_cond.wait(l); + } + LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit + << ", waited for flush, time_ns:" << timer.elapsed_time(); +#ifndef BE_TEST + bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || + proc_mem_no_allocator_cache >= process_soft_mem_limit; + // Some other thread is flushing data, and not reached hard limit now, + // we don't need to handle mem limit in current thread. + if (_soft_reduce_mem_in_progress && !hard_limit_reached) { + return; + } +#endif + + auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { + return lhs.mem_size < rhs.mem_size; + }; + std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); + + for (auto& writer : _writers) { + int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); + mem_heap.emplace(writer, active_memtable_mem); + } + int64_t mem_to_flushed = _mem_tracker->consumption() / 10; + int64_t mem_consumption_in_picked_writer = 0; + while (!mem_heap.empty()) { + WriterMemItem mem_item = mem_heap.top(); + auto writer = mem_item.writer; + int64_t mem_size = mem_item.mem_size; + writers_to_reduce_mem.emplace_back(writer, mem_size); + st = writer->flush_memtable_and_wait(false); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, txn_id={}, err={}", + writer->tablet_id(), writer->txn_id(), st.to_string()); + LOG(WARNING) << err_msg; + writer->cancel_with_status(st); + } + mem_consumption_in_picked_writer += mem_size; + if (mem_consumption_in_picked_writer > mem_to_flushed) { + break; + } + mem_heap.pop(); + } + if (writers_to_reduce_mem.empty()) { + // should not happen, add log to observe + LOG(WARNING) << "failed to find suitable writers to reduce memory" + << " when total load mem limit exceed"; + return; + } + + std::ostringstream oss; + oss << "reducing memory of " << writers_to_reduce_mem.size() + << " delta writers (total mem: " + << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) + << ", max mem: " << PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size) + << ", min mem:" << PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size) + << "), "; + if (proc_mem_no_allocator_cache < process_soft_mem_limit) { + oss << "because total load mem consumption " + << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; + if (_mem_tracker->consumption() > _load_hard_mem_limit) { + _should_wait_flush = true; + reducing_mem_on_hard_limit = true; + oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit); + } else { + _soft_reduce_mem_in_progress = true; + oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit); + } + } else { + _should_wait_flush = true; + reducing_mem_on_hard_limit = true; + oss << "because proc_mem_no_allocator_cache consumption " + << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache) + << ", has exceeded process soft limit " + << PrettyPrinter::print_bytes(process_soft_mem_limit) + << ", total load mem consumption: " + << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", vm_rss: " << PerfCounters::get_vm_rss_str(); + } + LOG(INFO) << oss.str(); + } + + // wait all writers flush without lock + for (auto item : writers_to_reduce_mem) { + VLOG_NOTICE << "reducing memory, wait flush mem_size: " + << PrettyPrinter::print_bytes(item.mem_size); + st = item.writer->wait_flush(); + if (!st.ok()) { + auto err_msg = fmt::format( + "tablet writer failed to reduce mem consumption by flushing memtable, " + "tablet_id={}, txn_id={}, err={}", + item.writer->tablet_id(), item.writer->txn_id(), st.to_string()); + LOG(WARNING) << err_msg; + item.writer->cancel_with_status(st); + } + } + + { + std::lock_guard<std::mutex> l(_lock); + // If a thread have finished the memtable flush for soft limit, and now + // the hard limit is already reached, it should not update these variables. + if (reducing_mem_on_hard_limit && _should_wait_flush) { + _should_wait_flush = false; + _wait_flush_cond.notify_all(); + } + if (_soft_reduce_mem_in_progress) { + _soft_reduce_mem_in_progress = false; + } + // refresh mem tacker to avoid duplicate reduce + _refresh_mem_tracker_without_lock(); + } +} + +void MemTableMemoryLimiter::_refresh_mem_tracker_without_lock() { + _mem_usage = 0; + for (auto& writer : _writers) { + _mem_usage += writer->mem_consumption(MemType::ALL); + } + THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), _mem_tracker.get()); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h new file mode 100644 index 0000000000..37cb710108 --- /dev/null +++ b/be/src/olap/memtable_memory_limiter.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include "common/status.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "util/countdown_latch.h" + +namespace doris { +class DeltaWriter; +struct WriterMemItem { + DeltaWriter* writer; + int64_t mem_size; +}; +class MemTableMemoryLimiter { +public: + MemTableMemoryLimiter(); + ~MemTableMemoryLimiter(); + + Status init(int64_t process_mem_limit); + + // check if the total mem consumption exceeds limit. + // If yes, it will flush memtable to try to reduce memory consumption. + void handle_memtable_flush(); + + void register_writer(DeltaWriter* writer); + + void deregister_writer(DeltaWriter* writer); + + void refresh_mem_tracker() { + std::lock_guard<std::mutex> l(_lock); + _refresh_mem_tracker_without_lock(); + } + + MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } + +private: + void _refresh_mem_tracker_without_lock(); + + std::mutex _lock; + // If hard limit reached, one thread will trigger load channel flush, + // other threads should wait on the condition variable. + bool _should_wait_flush = false; + std::condition_variable _wait_flush_cond; + int64_t _mem_usage = 0; + + std::unique_ptr<MemTrackerLimiter> _mem_tracker; + int64_t _load_hard_mem_limit = -1; + int64_t _load_soft_mem_limit = -1; + bool _soft_reduce_mem_in_progress = false; + + std::unordered_set<DeltaWriter*> _writers; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 056f2ca125..3077eaa3f5 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -27,6 +27,7 @@ #include <vector> #include "common/status.h" +#include "olap/memtable_memory_limiter.h" #include "olap/options.h" #include "util/threadpool.h" @@ -176,6 +177,7 @@ public: HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } FileMetaCache* file_meta_cache() { return _file_meta_cache; } + MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } // only for unit test void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } @@ -261,6 +263,7 @@ private: BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; + std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index f200d0a46f..ddec82544d 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -37,6 +37,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/file_meta_cache.h" +#include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/options.h" #include "olap/page_cache.h" @@ -168,6 +169,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(_store_paths); _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); + _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>(); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); @@ -184,6 +186,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _init_mem_env(); + RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit())); RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); @@ -407,6 +410,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_master_info); _new_load_stream_mgr.reset(); + _memtable_memory_limiter.reset(nullptr); _send_batch_thread_pool.reset(nullptr); _buffered_reader_prefetch_thread_pool.reset(nullptr); _send_report_thread_pool.reset(nullptr); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 29ebb63725..90432a386d 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,11 +25,9 @@ namespace doris { -LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, - int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, - int64_t backend_id, bool enable_profile) +LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, + const std::string& sender_ip, int64_t backend_id, bool enable_profile) : _load_id(load_id), - _mem_tracker(std::move(mem_tracker)), _timeout_s(timeout_s), _is_high_priority(is_high_priority), _sender_ip(sender_ip), @@ -43,9 +41,9 @@ LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> me } LoadChannel::~LoadChannel() { - LOG(INFO) << "load channel removed. mem peak usage=" << _mem_tracker->peak_consumption() - << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id - << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; + LOG(INFO) << "load channel removed" + << " load_id=" << _load_id << ", is high priority=" << _is_high_priority + << ", sender_ip=" << _sender_ip; } void LoadChannel::_init_profile() { @@ -148,7 +146,6 @@ void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { return; } - COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); // TabletSink and LoadChannel in BE are M: N relationship, // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, // so usually all LoadChannel runtime profiles are saved on each TabletSink, diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 3b5b4b2d58..0ad24c5697 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -35,6 +35,8 @@ #include <vector> #include "common/status.h" +#include "olap/memtable_memory_limiter.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/tablets_channel.h" #include "util/runtime_profile.h" @@ -52,9 +54,8 @@ class OpenPartitionRequest; // corresponding to a certain load job class LoadChannel { public: - LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip, int64_t backend_id, - bool enable_profile); + LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, + const std::string& sender_ip, int64_t backend_id, bool enable_profile); ~LoadChannel(); // open a new load channel if not exist @@ -73,52 +74,17 @@ public: const UniqueId& load_id() const { return _load_id; } - int64_t mem_consumption() { - int64_t mem_usage = 0; - { - std::lock_guard<SpinLock> l(_tablets_channels_lock); - for (auto& it : _tablets_channels) { - mem_usage += it.second->mem_consumption(); - } - } - _mem_tracker->set_consumption(mem_usage); - return mem_usage; - } - - void get_active_memtable_mem_consumption( - std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>* - writers_mem_snap) { - std::lock_guard<SpinLock> l(_tablets_channels_lock); - for (auto& it : _tablets_channels) { - std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem; - it.second->get_active_memtable_mem_consumption(&tablets_channel_mem); - writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem)); - } - } - int64_t timeout() const { return _timeout_s; } bool is_high_priority() const { return _is_high_priority; } - void flush_memtable_async(int64_t index_id, int64_t tablet_id) { - std::lock_guard<std::mutex> l(_lock); - auto it = _tablets_channels.find(index_id); - if (it != _tablets_channels.end()) { - it->second->flush_memtable_async(tablet_id); - } - } - - void wait_flush(int64_t index_id, int64_t tablet_id) { - std::lock_guard<std::mutex> l(_lock); - auto it = _tablets_channels.find(index_id); - if (it != _tablets_channels.end()) { - it->second->wait_flush(tablet_id); - } - } - RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; } RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; } + std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> get_tablets_channels() { + return _tablets_channels; + } + protected: Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished, const int64_t index_id); @@ -138,7 +104,14 @@ protected: std::lock_guard<std::mutex> l(_lock); { std::lock_guard<SpinLock> l(_tablets_channels_lock); - _tablets_channels.erase(index_id); + auto memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); + auto tablet_channel_it = _tablets_channels.find(index_id); + if (tablet_channel_it != _tablets_channels.end()) { + for (auto& writer_it : tablet_channel_it->second->get_tablet_writers()) { + memtable_memory_limiter->deregister_writer(writer_it.second); + } + _tablets_channels.erase(index_id); + } } _finished_channel_ids.emplace(index_id); } @@ -151,8 +124,6 @@ protected: private: UniqueId _load_id; - // Tracks the total memory consumed by current load job on this BE - std::unique_ptr<MemTracker> _mem_tracker; std::unique_ptr<RuntimeProfile> _profile; RuntimeProfile* _self_profile; @@ -167,6 +138,7 @@ private: // lock protect the tablets channel map std::mutex _lock; // index id -> tablets channel + // when you erase, you should call deregister_writer method in MemTableMemoryLimiter; std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels; SpinLock _tablets_channels_lock; // This is to save finished channels id, to handle the retry request. @@ -192,7 +164,7 @@ private: }; inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { - os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() + os << "LoadChannel(id=" << load_channel.load_id() << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time()) << ", is high priority: " << load_channel.is_high_priority() << ")"; return os; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 6862b99b2f..df77840c56 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -57,16 +57,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "", mem_consumption, Labels({{"type", "load"}})); -// Calculate the total memory limit of all load tasks on this BE -static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { - if (process_mem_limit == -1) { - // no limit - return -1; - } - int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent; - return process_mem_limit * max_load_memory_percent / 100; -} - static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) { int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; if (timeout_in_req_s > 0) { @@ -93,12 +83,7 @@ LoadChannelMgr::~LoadChannelMgr() { } Status LoadChannelMgr::init(int64_t process_mem_limit) { - _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); - _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; - _mem_tracker = - std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD, "LoadChannelMgr"); - REGISTER_HOOK_METRIC(load_channel_mem_consumption, - [this]() { return _mem_tracker->consumption(); }); + _memtable_memory_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024); RETURN_IF_ERROR(_start_bg_worker()); return Status::OK(); @@ -119,24 +104,18 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s); bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); - // Use the same mem limit as LoadChannelMgr for a single load channel -#ifndef BE_TEST - auto channel_mem_tracker = std::make_unique<MemTracker>( - fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(), - load_id.to_string()), - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); -#else - auto channel_mem_tracker = std::make_unique<MemTracker>(fmt::format( - "LoadChannel#senderIp={}#loadID={}", params.sender_ip(), load_id.to_string())); -#endif - channel.reset(new LoadChannel(load_id, std::move(channel_mem_tracker), - channel_timeout_s, is_high_priority, params.sender_ip(), - params.backend_id(), params.enable_profile())); + channel.reset(new LoadChannel(load_id, channel_timeout_s, is_high_priority, + params.sender_ip(), params.backend_id(), + params.enable_profile())); _load_channels.insert({load_id, channel}); } } RETURN_IF_ERROR(channel->open(params)); + { + std::lock_guard<std::mutex> l(_lock); + _register_channel_all_writers(channel); + } return Status::OK(); } @@ -182,7 +161,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, // If this is a high priority load task, do not handle this. // because this may block for a while, which may lead to rpc timeout. SCOPED_TIMER(channel->get_handle_mem_limit_timer()); - _handle_mem_exceed_limit(); + _memtable_memory_limiter->handle_memtable_flush(); } // 3. add batch to load channel @@ -205,7 +184,10 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; { std::lock_guard<std::mutex> l(_lock); - _load_channels.erase(load_id); + if (_load_channels.find(load_id) != _load_channels.end()) { + _deregister_channel_all_writers(_load_channels.find(load_id)->second); + _load_channels.erase(load_id); + } auto handle = _last_success_channel->insert(load_id.to_string(), nullptr, 1, dummy_deleter); _last_success_channel->release(handle); } @@ -219,6 +201,7 @@ Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { std::lock_guard<std::mutex> l(_lock); if (_load_channels.find(load_id) != _load_channels.end()) { cancelled_channel = _load_channels[load_id]; + _deregister_channel_all_writers(cancelled_channel); _load_channels.erase(load_id); } } @@ -263,6 +246,7 @@ Status LoadChannelMgr::_start_load_channels_clean() { } for (auto& key : need_delete_channel_ids) { + _deregister_channel_all_writers(_load_channels.find(key)->second); _load_channels.erase(key); LOG(INFO) << "erase timeout load channel: " << key; } @@ -277,175 +261,6 @@ Status LoadChannelMgr::_start_load_channels_clean() { << ", timeout(s): " << channel->timeout(); } - // this log print every 1 min, so that we could observe the mem consumption of load process - // on this Backend - LOG(INFO) << "load mem consumption(bytes). limit: " << _load_hard_mem_limit - << ", current: " << _mem_tracker->consumption() - << ", peak: " << _mem_tracker->peak_consumption() - << ", total running load channels: " << _load_channels.size(); - return Status::OK(); } - -void LoadChannelMgr::_handle_mem_exceed_limit() { - // Check the soft limit. - DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to reduce memory of load jobs. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { - return; - } - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - // tuple<LoadChannel, index_id, tablet_id, mem_size> - std::vector<std::tuple<std::shared_ptr<LoadChannel>, int64_t, int64_t, int64_t>> - writers_to_reduce_mem; - { - MonotonicStopWatch timer; - timer.start(); - std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); - - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } - - // tuple<LoadChannel, index_id, multimap<mem size, tablet_id>> - using WritersMem = std::tuple<std::shared_ptr<LoadChannel>, int64_t, - std::multimap<int64_t, int64_t, std::greater<int64_t>>>; - std::vector<WritersMem> all_writers_mem; - - // tuple<current iterator in multimap, end iterator in multimap, pos in all_writers_mem> - using WriterMemItem = - std::tuple<std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator, - std::multimap<int64_t, int64_t, std::greater<int64_t>>::iterator, - size_t>; - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return std::get<0>(lhs)->first < std::get<0>(rhs)->first; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> - tablets_mem_heap(cmp); - - for (auto& kv : _load_channels) { - if (kv.second->is_high_priority()) { - // do not select high priority channel to reduce memory - // to avoid blocking the. - continue; - } - std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>> - writers_mem_snap; - kv.second->get_active_memtable_mem_consumption(&writers_mem_snap); - for (auto item : writers_mem_snap) { - // multimap is empty - if (item.second.empty()) { - continue; - } - all_writers_mem.emplace_back(kv.second, item.first, std::move(item.second)); - } - } - for (size_t i = 0; i < all_writers_mem.size(); i++) { - tablets_mem_heap.emplace(std::get<2>(all_writers_mem[i]).begin(), - std::get<2>(all_writers_mem[i]).end(), i); - } - - // reduce 1/10 memory every time - int64_t mem_to_flushed = _mem_tracker->consumption() / 10; - int64_t mem_consumption_in_picked_writer = 0; - while (!tablets_mem_heap.empty()) { - WriterMemItem tablet_mem_item = tablets_mem_heap.top(); - size_t pos = std::get<2>(tablet_mem_item); - auto load_channel = std::get<0>(all_writers_mem[pos]); - int64_t index_id = std::get<1>(all_writers_mem[pos]); - int64_t tablet_id = std::get<0>(tablet_mem_item)->second; - int64_t mem_size = std::get<0>(tablet_mem_item)->first; - writers_to_reduce_mem.emplace_back(load_channel, index_id, tablet_id, mem_size); - load_channel->flush_memtable_async(index_id, tablet_id); - mem_consumption_in_picked_writer += std::get<0>(tablet_mem_item)->first; - if (mem_consumption_in_picked_writer > mem_to_flushed) { - break; - } - tablets_mem_heap.pop(); - if (++std::get<0>(tablet_mem_item) != std::get<1>(tablet_mem_item)) { - tablets_mem_heap.push(tablet_mem_item); - } - } - - if (writers_to_reduce_mem.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable writers to reduce memory" - << " when total load mem limit exceed"; - return; - } - - std::ostringstream oss; - oss << "reducing memory of " << writers_to_reduce_mem.size() - << " delta writers (total mem: " - << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: " - << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front())) - << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front()) - << ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back())) - << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << "), "; - if (proc_mem_no_allocator_cache < process_soft_mem_limit) { - oss << "because total load mem consumption " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; - if (_mem_tracker->consumption() > _load_hard_mem_limit) { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit); - } else { - _soft_reduce_mem_in_progress = true; - oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit); - } - } else { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << "because proc_mem_no_allocator_cache consumption " - << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache) - << ", has exceeded process soft limit " - << PrettyPrinter::print_bytes(process_soft_mem_limit) - << ", total load mem consumption: " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", vm_rss: " << PerfCounters::get_vm_rss_str(); - } - LOG(INFO) << oss.str(); - } - - // wait all writers flush without lock - for (auto item : writers_to_reduce_mem) { - VLOG_NOTICE << "reducing memory, wait flush load_id: " << std::get<0>(item)->load_id() - << ", index_id: " << std::get<1>(item) << ", tablet_id: " << std::get<2>(item) - << ", mem_size: " << PrettyPrinter::print_bytes(std::get<3>(item)); - std::get<0>(item)->wait_flush(std::get<1>(item), std::get<2>(item)); - } - - { - std::lock_guard<std::mutex> l(_lock); - // If a thread have finished the memtable flush for soft limit, and now - // the hard limit is already reached, it should not update these variables. - if (reducing_mem_on_hard_limit && _should_wait_flush) { - _should_wait_flush = false; - _wait_flush_cond.notify_all(); - } - if (_soft_reduce_mem_in_progress) { - _soft_reduce_mem_in_progress = false; - } - // refresh mem tacker to avoid duplicate reduce - _refresh_mem_tracker_without_lock(); - } -} - } // namespace doris diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index cad611b75b..db137aa5c0 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -30,6 +30,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "olap/lru_cache.h" +#include "olap/memtable_memory_limiter.h" #include "runtime/load_channel.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" @@ -60,51 +61,39 @@ public: // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); - void refresh_mem_tracker() { - std::lock_guard<std::mutex> l(_lock); - _refresh_mem_tracker_without_lock(); - } - MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } - private: Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, const UniqueId& load_id, const PTabletWriterAddBlockRequest& request); void _finish_load_channel(UniqueId load_id); - // check if the total load mem consumption exceeds limit. - // If yes, it will pick a load channel to try to reduce memory consumption. - void _handle_mem_exceed_limit(); Status _start_bg_worker(); - // lock should be held when calling this method - void _refresh_mem_tracker_without_lock() { - _mem_usage = 0; - for (auto& kv : _load_channels) { - _mem_usage += kv.second->mem_consumption(); + void _register_channel_all_writers(std::shared_ptr<doris::LoadChannel> channel) { + for (auto& tablet_channel_it : channel->get_tablets_channels()) { + for (auto& writer_it : tablet_channel_it.second->get_tablet_writers()) { + _memtable_memory_limiter->register_writer(writer_it.second); + } + } + } + + void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel> channel) { + for (auto& tablet_channel_it : channel->get_tablets_channels()) { + for (auto& writer_it : tablet_channel_it.second->get_tablet_writers()) { + _memtable_memory_limiter->deregister_writer(writer_it.second); + } } - THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), - _mem_tracker.get()); } protected: // lock protect the load channel map std::mutex _lock; // load id -> load channel + // when you erase, you should call deregister_writer method in MemTableMemoryLimiter ; std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels; Cache* _last_success_channel = nullptr; - // check the total load channel mem consumption of this Backend - int64_t _mem_usage = 0; - std::unique_ptr<MemTrackerLimiter> _mem_tracker; - int64_t _load_hard_mem_limit = -1; - int64_t _load_soft_mem_limit = -1; - bool _soft_reduce_mem_in_progress = false; - - // If hard limit reached, one thread will trigger load channel flush, - // other threads should wait on the condition variable. - bool _should_wait_flush = false; - std::condition_variable _wait_flush_cond; + MemTableMemoryLimiter* _memtable_memory_limiter = nullptr; CountDownLatch _stop_background_threads_latch; // thread to clean timeout load channels diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 517589f767..610af53c77 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -26,9 +26,9 @@ #include <queue> #include <utility> +#include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" -#include "runtime/load_channel_mgr.h" #include "runtime/task_group/task_group.h" #include "service/backend_options.h" #include "util/mem_info.h" @@ -240,7 +240,7 @@ std::string MemTrackerLimiter::log_process_usage_str() { // Add additional tracker printed when memory exceeds limit. snapshots.emplace_back( - ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()->make_snapshot()); + ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->make_snapshot()); detail += "\nMemory Tracker Summary:"; for (const auto& snapshot : snapshots) { diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 6d94b00007..7f45b404b7 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -291,18 +291,6 @@ void TabletsChannel::_add_error_tablet( << "err msg " << error; } -int64_t TabletsChannel::mem_consumption() { - int64_t mem_usage = 0; - { - std::lock_guard<SpinLock> l(_tablet_writers_lock); - for (auto& it : _tablet_writers) { - int64_t writer_mem = it.second->mem_consumption(MemType::ALL); - mem_usage += writer_mem; - } - } - return mem_usage; -} - void TabletsChannel::refresh_profile() { int64_t write_mem_usage = 0; int64_t flush_mem_usage = 0; @@ -330,16 +318,6 @@ void TabletsChannel::refresh_profile() { COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage); } -void TabletsChannel::get_active_memtable_mem_consumption( - std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) { - mem_consumptions->clear(); - std::lock_guard<SpinLock> l(_tablet_writers_lock); - for (auto& it : _tablet_writers) { - int64_t active_memtable_mem = it.second->active_memtable_mem_consumption(); - mem_consumptions->emplace(active_memtable_mem, it.first); - } -} - Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; @@ -501,69 +479,6 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } -void TabletsChannel::flush_memtable_async(int64_t tablet_id) { - std::lock_guard<std::mutex> l(_lock); - if (_state == kFinished) { - // TabletsChannel is closed without LoadChannel's lock, - // therefore it's possible for reduce_mem_usage() to be called right after close() - LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id - << ", index_id: " << _index_id; - return; - } - - auto iter = _tablet_writers.find(tablet_id); - if (iter == _tablet_writers.end()) { - return; - } - - if (!(_reducing_tablets.insert(tablet_id).second)) { - return; - } - - Status st = iter->second->flush_memtable_and_wait(false); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, txn_id={}, err={}", - tablet_id, _txn_id, st.to_string()); - LOG(WARNING) << err_msg; - iter->second->cancel_with_status(st); - _add_broken_tablet(tablet_id); - } -} - -void TabletsChannel::wait_flush(int64_t tablet_id) { - { - std::lock_guard<std::mutex> l(_lock); - if (_state == kFinished) { - // TabletsChannel is closed without LoadChannel's lock, - // therefore it's possible for reduce_mem_usage() to be called right after close() - LOG(INFO) << "TabletsChannel is closed when reduce mem usage, txn_id: " << _txn_id - << ", index_id: " << _index_id; - return; - } - } - - auto iter = _tablet_writers.find(tablet_id); - if (iter == _tablet_writers.end()) { - return; - } - Status st = iter->second->wait_flush(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, txn_id={}, err={}", - tablet_id, _txn_id, st.to_string()); - LOG(WARNING) << err_msg; - iter->second->cancel_with_status(st); - _add_broken_tablet(tablet_id); - } - - { - std::lock_guard<std::mutex> l(_lock); - _reducing_tablets.erase(tablet_id); - } -} void TabletsChannel::_add_broken_tablet(int64_t tablet_id) { std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock); _broken_tablets.insert(tablet_id); diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index b31aaee214..17bdcf5ec8 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -110,15 +110,9 @@ public: // no-op when this channel has been closed or cancelled Status cancel(); - int64_t mem_consumption(); - void refresh_profile(); - void get_active_memtable_mem_consumption( - std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions); - - void flush_memtable_async(int64_t tablet_id); - void wait_flush(int64_t tablet_id); + std::unordered_map<int64_t, DeltaWriter*> get_tablet_writers() { return _tablet_writers; } private: template <typename Request> @@ -175,6 +169,7 @@ private: std::map<int64, int64> _tablet_partition_map; // tablet_id -> TabletChannel + // when you erase, you should call deregister_writer method in MemTableMemoryLimiter; std::unordered_map<int64_t, DeltaWriter*> _tablet_writers; // broken tablet ids. // If a tablet write fails, it's id will be added to this set. diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 646a4449c0..39036d4589 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -190,6 +190,7 @@ public: UIntGauge* compaction_mem_consumption; UIntGauge* load_mem_consumption; UIntGauge* load_channel_mem_consumption; + UIntGauge* memtable_memory_limiter_mem_consumption; UIntGauge* query_mem_consumption; UIntGauge* schema_change_mem_consumption; UIntGauge* storage_migration_mem_consumption; diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp new file mode 100644 index 0000000000..7b49b22b32 --- /dev/null +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memtable_memory_limiter.h" + +#include "exec/tablet_info.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/delta_writer.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" + +namespace doris { +static const uint32_t MAX_PATH_LEN = 1024; + +static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, + TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 3; + request->tablet_schema.keys_type = TKeysType::AGG_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->__set_storage_format(TStorageFormat::V2); + + TColumn k1; + k1.column_name = "k1"; + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn k3; + k3.column_name = "k3"; + k3.__set_is_key(true); + k3.column_type.type = TPrimitiveType::INT; + request->tablet_schema.columns.push_back(k3); +} + +static TDescriptorTable create_descriptor_tablet() { + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name("k3").column_pos(2).build()); + + tuple_builder.build(&dtb); + return dtb.desc_tbl(); +} + +class MemTableMemoryLimiterTest : public testing::Test { +protected: + void SetUp() override { + // set path + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/data_test"; + io::global_local_filesystem()->delete_and_create_directory(config::storage_root_path); + std::vector<StorePath> paths; + paths.emplace_back(config::storage_root_path, -1); + + _mgr = new MemTableMemoryLimiter(); + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &_engine); + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(_engine); + _engine->start_bg_threads(); + } + + void TearDown() override { + if (_engine != nullptr) { + _engine->stop(); + delete _engine; + _engine = nullptr; + } + if (_mgr != nullptr) { + delete _mgr; + _mgr = nullptr; + } + EXPECT_EQ(system("rm -rf ./data_test"), 0); + io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME")) + "/" + + UNUSED_PREFIX); + } + + StorageEngine* _engine = nullptr; + MemTableMemoryLimiter* _mgr = nullptr; +}; + +TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { + TCreateTabletReq request; + create_tablet_request(10000, 270068372, &request); + Status res = _engine->create_tablet(request); + ASSERT_TRUE(res.ok()); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + OlapTableSchemaParam param; + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = { + 10000, 270068372, 20002, 30002, load_id, tuple_desc, &(tuple_desc->slots()), + false, ¶m}; + DeltaWriter* delta_writer = nullptr; + std::unique_ptr<RuntimeProfile> profile; + profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest"); + DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId()); + ASSERT_NE(delta_writer, nullptr); + + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + auto columns = block.mutate_columns(); + { + int8_t k1 = -127; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + + int16_t k2 = -32767; + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + + int32_t k3 = -2147483647; + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + + res = delta_writer->write(&block, {0}); + ASSERT_TRUE(res.ok()); + } + std::mutex lock; + _mgr->init(100); + { + std::lock_guard<std::mutex> l(lock); + _mgr->register_writer(delta_writer); + } + _mgr->handle_memtable_flush(); + CHECK_EQ(0, delta_writer->active_memtable_mem_consumption()); + { + std::lock_guard<std::mutex> l(lock); + _mgr->deregister_writer(delta_writer); + } + + res = delta_writer->close(); + EXPECT_EQ(Status::OK(), res); + res = delta_writer->build_rowset(); + EXPECT_EQ(Status::OK(), res); + res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + EXPECT_EQ(Status::OK(), res); + res = _engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); + EXPECT_EQ(Status::OK(), res); + delete delta_writer; +} +} // namespace doris \ No newline at end of file diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 8ab61da608..fe00733df9 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -958,6 +958,11 @@ BaseCompaction:546859: * Description: Whether to use mmap to allocate memory * Default value: false +#### `memtable_mem_tracker_refresh_interval_ms` + +* Description: Interval in milliseconds between memtbale flush mgr refresh iterations +* Default value: 100 + #### `download_cache_buffer_size` * Type: int64 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index c044319a8d..170a1f785b 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -971,6 +971,11 @@ BaseCompaction:546859: * 描述:是否使用mmap分配内存 * 默认值:false +#### `memtable_mem_tracker_refresh_interval_ms` + +* 描述:memtable主动下刷时刷新内存统计的周期(毫秒) +* 默认值:100 + #### `download_cache_buffer_size` * 类型: int64 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org