github-actions[bot] commented on code in PR #25653: URL: https://github.com/apache/doris/pull/25653#discussion_r1369904458
########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -59,135 +59,81 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer _writers.push_back(writer); } +bool MemTableMemoryLimiter::_soft_limit_reached() { + return _mem_tracker->consumption() >= _load_soft_mem_limit || + (MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit() && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10); +} + +bool MemTableMemoryLimiter::_hard_limit_reached() { + return _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); -#ifndef BE_TEST - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to flush memtable. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + if (!_soft_limit_reached()) { return; } -#endif - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - Status st; - std::vector<WriterMemItem> writers_to_reduce_mem; { MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the one tenth of load hard limit " << _load_hard_mem_limit / 10 - << "and process remaining allocator cache " << proc_mem_no_allocator_cache - << "reached process soft memory limit " << process_soft_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); -#ifndef BE_TEST - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } -#endif - - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return lhs.mem_size < rhs.mem_size; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); - - for (auto it = _writers.begin(); it != _writers.end();) { - if (auto writer = it->lock()) { - int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); - mem_heap.emplace(writer, active_memtable_mem); - ++it; - } else { - *it = std::move(_writers.back()); - _writers.pop_back(); + while (_soft_limit_reached()) { + LOG(INFO) << "reached memtable memory soft limit" + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + if (_active_mem_usage >= _load_soft_mem_limit || + (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { + _flush_active_memtables(); } + _soft_limit_end_cond.wait(l); } - int64_t mem_to_flushed = _mem_tracker->consumption() / 10; - int64_t mem_consumption_in_picked_writer = 0; - while (!mem_heap.empty()) { - WriterMemItem mem_item = mem_heap.top(); - mem_heap.pop(); - auto writer = mem_item.writer.lock(); - if (!writer) { - continue; - } - int64_t mem_size = mem_item.mem_size; - writers_to_reduce_mem.emplace_back(writer, mem_size); - st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast<void>(writer->cancel_with_status(st)); - } - mem_consumption_in_picked_writer += mem_size; - if (mem_consumption_in_picked_writer > mem_to_flushed) { - break; - } - } - if (writers_to_reduce_mem.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable writers to reduce memory" - << " when total load mem limit exceed"; - return; - } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp int64_t time_ms = timer.elapsed_time() / 1000 / 1000; ^ ``` ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -59,135 +59,81 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer _writers.push_back(writer); } +bool MemTableMemoryLimiter::_soft_limit_reached() { + return _mem_tracker->consumption() >= _load_soft_mem_limit || + (MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit() && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10); +} + +bool MemTableMemoryLimiter::_hard_limit_reached() { + return _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); -#ifndef BE_TEST - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to flush memtable. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + if (!_soft_limit_reached()) { return; } -#endif - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - Status st; - std::vector<WriterMemItem> writers_to_reduce_mem; { MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the one tenth of load hard limit " << _load_hard_mem_limit / 10 - << "and process remaining allocator cache " << proc_mem_no_allocator_cache - << "reached process soft memory limit " << process_soft_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); -#ifndef BE_TEST - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } -#endif - - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return lhs.mem_size < rhs.mem_size; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); - - for (auto it = _writers.begin(); it != _writers.end();) { - if (auto writer = it->lock()) { - int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); - mem_heap.emplace(writer, active_memtable_mem); - ++it; - } else { - *it = std::move(_writers.back()); - _writers.pop_back(); + while (_soft_limit_reached()) { + LOG(INFO) << "reached memtable memory soft limit" + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + if (_active_mem_usage >= _load_soft_mem_limit || + (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { Review Comment: warning: 10 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { ^ ``` ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -59,135 +59,81 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer _writers.push_back(writer); } +bool MemTableMemoryLimiter::_soft_limit_reached() { + return _mem_tracker->consumption() >= _load_soft_mem_limit || + (MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit() && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10); +} + +bool MemTableMemoryLimiter::_hard_limit_reached() { + return _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); -#ifndef BE_TEST - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to flush memtable. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + if (!_soft_limit_reached()) { return; } -#endif - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - Status st; - std::vector<WriterMemItem> writers_to_reduce_mem; { MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the one tenth of load hard limit " << _load_hard_mem_limit / 10 - << "and process remaining allocator cache " << proc_mem_no_allocator_cache - << "reached process soft memory limit " << process_soft_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); -#ifndef BE_TEST - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } -#endif - - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return lhs.mem_size < rhs.mem_size; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); - - for (auto it = _writers.begin(); it != _writers.end();) { - if (auto writer = it->lock()) { - int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); - mem_heap.emplace(writer, active_memtable_mem); - ++it; - } else { - *it = std::move(_writers.back()); - _writers.pop_back(); + while (_soft_limit_reached()) { + LOG(INFO) << "reached memtable memory soft limit" + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + if (_active_mem_usage >= _load_soft_mem_limit || + (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { + _flush_active_memtables(); } + _soft_limit_end_cond.wait(l); } - int64_t mem_to_flushed = _mem_tracker->consumption() / 10; - int64_t mem_consumption_in_picked_writer = 0; - while (!mem_heap.empty()) { - WriterMemItem mem_item = mem_heap.top(); - mem_heap.pop(); - auto writer = mem_item.writer.lock(); - if (!writer) { - continue; - } - int64_t mem_size = mem_item.mem_size; - writers_to_reduce_mem.emplace_back(writer, mem_size); - st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast<void>(writer->cancel_with_status(st)); - } - mem_consumption_in_picked_writer += mem_size; - if (mem_consumption_in_picked_writer > mem_to_flushed) { - break; - } - } - if (writers_to_reduce_mem.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable writers to reduce memory" - << " when total load mem limit exceed"; - return; - } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; + LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"; + } +} - std::ostringstream oss; - oss << "reducing memory of " << writers_to_reduce_mem.size() - << " memtable writers (total mem: " - << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) - << ", max mem: " << PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size) - << ", min mem:" << PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size) - << "), "; - if (proc_mem_no_allocator_cache < process_soft_mem_limit) { - oss << "because total load mem consumption " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; - if (_mem_tracker->consumption() > _load_hard_mem_limit) { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit); - } else { - _soft_reduce_mem_in_progress = true; - oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit); - } +void MemTableMemoryLimiter::_flush_active_memtables() { + auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { return lhs.mem_size < rhs.mem_size; }; + std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); + + _active_mem_usage = 0; + for (auto it = _writers.begin(); it != _writers.end();) { + if (auto writer = it->lock()) { + int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); + _active_mem_usage += active_memtable_mem; + mem_heap.emplace(writer, active_memtable_mem); + ++it; } else { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << "because proc_mem_no_allocator_cache consumption " - << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache) - << ", has exceeded process soft limit " - << PrettyPrinter::print_bytes(process_soft_mem_limit) - << ", total load mem consumption: " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", vm_rss: " << PerfCounters::get_vm_rss_str(); + *it = std::move(_writers.back()); + _writers.pop_back(); } - LOG(INFO) << oss.str(); } - - // wait all writers flush without lock - for (auto item : writers_to_reduce_mem) { - VLOG_NOTICE << "reducing memory, wait flush mem_size: " - << PrettyPrinter::print_bytes(item.mem_size); - auto writer = item.writer.lock(); + int64_t mem_to_flush = _hard_limit_reached() + ? _active_mem_usage - _load_hard_mem_limit / 11 + : _active_mem_usage - _load_soft_mem_limit / 5 * 4; Review Comment: warning: 5 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp : _active_mem_usage - _load_soft_mem_limit / 5 * 4; ^ ``` ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -59,135 +59,81 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer _writers.push_back(writer); } +bool MemTableMemoryLimiter::_soft_limit_reached() { + return _mem_tracker->consumption() >= _load_soft_mem_limit || + (MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit() && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10); +} + +bool MemTableMemoryLimiter::_hard_limit_reached() { + return _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); -#ifndef BE_TEST - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to flush memtable. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + if (!_soft_limit_reached()) { return; } -#endif - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - Status st; - std::vector<WriterMemItem> writers_to_reduce_mem; { MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the one tenth of load hard limit " << _load_hard_mem_limit / 10 - << "and process remaining allocator cache " << proc_mem_no_allocator_cache - << "reached process soft memory limit " << process_soft_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); -#ifndef BE_TEST - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } -#endif - - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return lhs.mem_size < rhs.mem_size; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); - - for (auto it = _writers.begin(); it != _writers.end();) { - if (auto writer = it->lock()) { - int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); - mem_heap.emplace(writer, active_memtable_mem); - ++it; - } else { - *it = std::move(_writers.back()); - _writers.pop_back(); + while (_soft_limit_reached()) { + LOG(INFO) << "reached memtable memory soft limit" + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + if (_active_mem_usage >= _load_soft_mem_limit || + (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { + _flush_active_memtables(); } + _soft_limit_end_cond.wait(l); } - int64_t mem_to_flushed = _mem_tracker->consumption() / 10; - int64_t mem_consumption_in_picked_writer = 0; - while (!mem_heap.empty()) { - WriterMemItem mem_item = mem_heap.top(); - mem_heap.pop(); - auto writer = mem_item.writer.lock(); - if (!writer) { - continue; - } - int64_t mem_size = mem_item.mem_size; - writers_to_reduce_mem.emplace_back(writer, mem_size); - st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast<void>(writer->cancel_with_status(st)); - } - mem_consumption_in_picked_writer += mem_size; - if (mem_consumption_in_picked_writer > mem_to_flushed) { - break; - } - } - if (writers_to_reduce_mem.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable writers to reduce memory" - << " when total load mem limit exceed"; - return; - } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; + LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"; + } +} - std::ostringstream oss; - oss << "reducing memory of " << writers_to_reduce_mem.size() - << " memtable writers (total mem: " - << PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) - << ", max mem: " << PrettyPrinter::print_bytes(writers_to_reduce_mem.front().mem_size) - << ", min mem:" << PrettyPrinter::print_bytes(writers_to_reduce_mem.back().mem_size) - << "), "; - if (proc_mem_no_allocator_cache < process_soft_mem_limit) { - oss << "because total load mem consumption " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded"; - if (_mem_tracker->consumption() > _load_hard_mem_limit) { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << " hard limit: " << PrettyPrinter::print_bytes(_load_hard_mem_limit); - } else { - _soft_reduce_mem_in_progress = true; - oss << " soft limit: " << PrettyPrinter::print_bytes(_load_soft_mem_limit); - } +void MemTableMemoryLimiter::_flush_active_memtables() { + auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { return lhs.mem_size < rhs.mem_size; }; + std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); + + _active_mem_usage = 0; + for (auto it = _writers.begin(); it != _writers.end();) { + if (auto writer = it->lock()) { + int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); + _active_mem_usage += active_memtable_mem; + mem_heap.emplace(writer, active_memtable_mem); + ++it; } else { - _should_wait_flush = true; - reducing_mem_on_hard_limit = true; - oss << "because proc_mem_no_allocator_cache consumption " - << PrettyPrinter::print_bytes(proc_mem_no_allocator_cache) - << ", has exceeded process soft limit " - << PrettyPrinter::print_bytes(process_soft_mem_limit) - << ", total load mem consumption: " - << PrettyPrinter::print_bytes(_mem_tracker->consumption()) - << ", vm_rss: " << PerfCounters::get_vm_rss_str(); + *it = std::move(_writers.back()); + _writers.pop_back(); } - LOG(INFO) << oss.str(); } - - // wait all writers flush without lock - for (auto item : writers_to_reduce_mem) { - VLOG_NOTICE << "reducing memory, wait flush mem_size: " - << PrettyPrinter::print_bytes(item.mem_size); - auto writer = item.writer.lock(); + int64_t mem_to_flush = _hard_limit_reached() + ? _active_mem_usage - _load_hard_mem_limit / 11 Review Comment: warning: 11 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp ? _active_mem_usage - _load_hard_mem_limit / 11 ^ ``` ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -59,135 +59,81 @@ void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer _writers.push_back(writer); } +bool MemTableMemoryLimiter::_soft_limit_reached() { + return _mem_tracker->consumption() >= _load_soft_mem_limit || + (MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit() && + _mem_tracker->consumption() >= _load_hard_mem_limit / 10); +} + +bool MemTableMemoryLimiter::_hard_limit_reached() { + return _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= MemInfo::soft_mem_limit(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); - // Record current memory status. - int64_t process_soft_mem_limit = MemInfo::soft_mem_limit(); - int64_t proc_mem_no_allocator_cache = MemInfo::proc_mem_no_allocator_cache(); -#ifndef BE_TEST - // If process memory is almost full but data load don't consume more than 5% (50% * 10%) of - // total memory, we don't need to flush memtable. - bool reduce_on_process_soft_mem_limit = - proc_mem_no_allocator_cache >= process_soft_mem_limit && - _mem_tracker->consumption() >= _load_hard_mem_limit / 10; - if (_mem_tracker->consumption() < _load_soft_mem_limit && !reduce_on_process_soft_mem_limit) { + if (!_soft_limit_reached()) { return; } -#endif - // Indicate whether current thread is reducing mem on hard limit. - bool reducing_mem_on_hard_limit = false; - Status st; - std::vector<WriterMemItem> writers_to_reduce_mem; { MonotonicStopWatch timer; timer.start(); std::unique_lock<std::mutex> l(_lock); - while (_should_wait_flush) { - _wait_flush_cond.wait(l); - } - LOG(INFO) << "Reached the one tenth of load hard limit " << _load_hard_mem_limit / 10 - << "and process remaining allocator cache " << proc_mem_no_allocator_cache - << "reached process soft memory limit " << process_soft_mem_limit - << ", waited for flush, time_ns:" << timer.elapsed_time(); -#ifndef BE_TEST - bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || - proc_mem_no_allocator_cache >= process_soft_mem_limit; - // Some other thread is flushing data, and not reached hard limit now, - // we don't need to handle mem limit in current thread. - if (_soft_reduce_mem_in_progress && !hard_limit_reached) { - return; - } -#endif - - auto cmp = [](WriterMemItem& lhs, WriterMemItem& rhs) { - return lhs.mem_size < rhs.mem_size; - }; - std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, decltype(cmp)> mem_heap(cmp); - - for (auto it = _writers.begin(); it != _writers.end();) { - if (auto writer = it->lock()) { - int64_t active_memtable_mem = writer->active_memtable_mem_consumption(); - mem_heap.emplace(writer, active_memtable_mem); - ++it; - } else { - *it = std::move(_writers.back()); - _writers.pop_back(); + while (_soft_limit_reached()) { + LOG(INFO) << "reached memtable memory soft limit" + << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << ")"; + if (_active_mem_usage >= _load_soft_mem_limit || + (_active_mem_usage > _load_hard_mem_limit / 10 && _hard_limit_reached())) { + _flush_active_memtables(); } + _soft_limit_end_cond.wait(l); } - int64_t mem_to_flushed = _mem_tracker->consumption() / 10; - int64_t mem_consumption_in_picked_writer = 0; - while (!mem_heap.empty()) { - WriterMemItem mem_item = mem_heap.top(); - mem_heap.pop(); - auto writer = mem_item.writer.lock(); - if (!writer) { - continue; - } - int64_t mem_size = mem_item.mem_size; - writers_to_reduce_mem.emplace_back(writer, mem_size); - st = writer->flush_async(); - if (!st.ok()) { - auto err_msg = fmt::format( - "tablet writer failed to reduce mem consumption by flushing memtable, " - "tablet_id={}, err={}", - writer->tablet_id(), st.to_string()); - LOG(WARNING) << err_msg; - static_cast<void>(writer->cancel_with_status(st)); - } - mem_consumption_in_picked_writer += mem_size; - if (mem_consumption_in_picked_writer > mem_to_flushed) { - break; - } - } - if (writers_to_reduce_mem.empty()) { - // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable writers to reduce memory" - << " when total load mem limit exceed"; - return; - } + timer.stop(); + int64_t time_ms = timer.elapsed_time() / 1000 / 1000; Review Comment: warning: 1000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp int64_t time_ms = timer.elapsed_time() / 1000 / 1000; ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org