This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8f46bf97b21 [fix](spill) Fix `WorkloadGroupManager` in spill (#49373) 8f46bf97b21 is described below commit 8f46bf97b217af247dd5397298f8e1ddb58bd93b Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Tue Mar 25 19:15:07 2025 +0800 [fix](spill) Fix `WorkloadGroupManager` in spill (#49373) ### What problem does this PR solve? Fix bugs and add comments. --- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/common/daemon.cpp | 20 ++- be/src/olap/memtable_flush_executor.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 5 +- be/src/runtime/memory/global_memory_arbitrator.cpp | 11 +- be/src/runtime/memory/global_memory_arbitrator.h | 21 +-- be/src/runtime/memory/mem_tracker_limiter.cpp | 4 + be/src/runtime/memory/mem_tracker_limiter.h | 5 + be/src/runtime/memory/memory_profile.cpp | 3 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 3 +- be/src/runtime/thread_context.h | 44 +++--- be/src/runtime/workload_group/workload_group.cpp | 10 ++ .../workload_group/workload_group_manager.cpp | 169 +++++++++++++++------ .../workload_group/workload_group_manager.h | 2 +- be/src/vec/common/allocator.cpp | 4 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 8 +- .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 16 +- 18 files changed, 223 insertions(+), 110 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 30f546f9562..8698b72f9b7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -661,6 +661,9 @@ DEFINE_Int32(load_process_soft_mem_limit_percent, "80"); // memtable memory limiter will do nothing. DEFINE_Int32(load_process_safe_mem_permit_percent, "5"); +// If there are a lot of memtable memory, then wait them flush finished. +DEFINE_mDouble(load_max_wg_active_memtable_percent, "0.6"); + // result buffer cancelled time (unit: second) DEFINE_mInt32(result_buffer_cancelled_interval_time, "300"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1c6948d4ee8..bba0d5abc9d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -700,6 +700,9 @@ DECLARE_Int32(load_process_soft_mem_limit_percent); // memtable memory limiter will do nothing. DECLARE_Int32(load_process_safe_mem_permit_percent); +// If there are a lot of memtable memory, then wait them flush finished. +DECLARE_mDouble(load_max_wg_active_memtable_percent); + // result buffer cancelled time (unit: second) DECLARE_mInt32(result_buffer_cancelled_interval_time); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 2fed93a6104..72b1e13aaa1 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -236,7 +236,7 @@ void refresh_cache_capacity() { if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load( std::memory_order_relaxed)) { // the last cache capacity adjustment has not been completed. - // if not return, last_cache_capacity_adjust_weighted may be modified, but notify is ignored. + // if not return, last_periodic_refreshed_cache_capacity_adjust_weighted may be modified, but notify is ignored. return; } if (refresh_cache_capacity_sleep_time_ms <= 0) { @@ -251,8 +251,8 @@ void refresh_cache_capacity() { AlgoUtil::descent_by_step(10, cache_capacity_reduce_mem_limit, doris::MemInfo::soft_mem_limit(), process_memory_usage); if (new_cache_capacity_adjust_weighted != - doris::GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted) { - doris::GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted = + doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted) { + doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted = new_cache_capacity_adjust_weighted; doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms; @@ -547,8 +547,8 @@ void Daemon::cache_adjust_capacity_thread() { l, std::chrono::milliseconds(100)); } double adjust_weighted = std::min<double>( - GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted, - GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted); + GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted, + GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted); if (_stop_background_threads_latch.count() == 0) { break; } @@ -560,13 +560,21 @@ void Daemon::cache_adjust_capacity_thread() { if (config::disable_memory_gc) { continue; } + if (GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted == + adjust_weighted) { + LOG(INFO) << fmt::format( + "[MemoryGC] adjust cache capacity end, adjust_weighted {} has not been " + "modified.", + adjust_weighted); + continue; + } std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>(""); auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted, profile.get()); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( - "[MemoryGC] refresh cache capacity end, free memory {}, details: {}", + "[MemoryGC] adjust cache capacity end, free memory {}, details: {}", PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str()); GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted; } while (true); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index fa49c2553ae..ad1adc6d7ea 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -148,7 +148,7 @@ Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& r Status st; do { // only try to reserve process memory - st = thread_context->try_reserve_process_memory(size); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size, true); if (st.ok()) { memtable_flush_executor->inc_flushing_task(); break; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 152c1c79b87..a751acd7194 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -455,7 +455,7 @@ Status PipelineTask::execute(bool* done) { if (workload_group && _state->get_query_ctx()->enable_reserve_memory() && reserve_size > 0) { - auto st = thread_context()->try_reserve_memory(reserve_size); + auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size); COUNTER_UPDATE(_memory_reserve_times, 1); if (!st.ok() && !_state->enable_force_spill()) { @@ -509,7 +509,8 @@ Status PipelineTask::execute(bool* done) { !(wake_up_early() || _dry_run)) { const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, _eos); status = sink_reserve_size != 0 - ? thread_context()->try_reserve_memory(sink_reserve_size) + ? thread_context()->thread_mem_tracker_mgr->try_reserve( + sink_reserve_size) : Status::OK(); auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index f538387bca6..7616569e819 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -52,10 +52,13 @@ std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; std::mutex GlobalMemoryArbitrator::cache_adjust_capacity_lock; std::condition_variable GlobalMemoryArbitrator::cache_adjust_capacity_cv; std::atomic<bool> GlobalMemoryArbitrator::cache_adjust_capacity_notify {false}; -// This capacity is set by gc thread, it is running periodicity. -std::atomic<double> GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted {1}; -// This capacity is set by workload group spill disk thread -std::atomic<double> GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1}; +// This capacity is set by `refresh_cache_capacity`, it is running periodicity. +// modified when process memory changes. +std::atomic<double> GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted { + 1}; +// This capacity is set by workload group mgr `handle_paused_queries`, +// modified when a query enters paused state due to insufficient process memory. +std::atomic<double> GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted {1}; // The value that take affect std::atomic<double> GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1}; std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit {false}; diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index aa8939d8f6a..6b5b736c4fe 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -148,17 +148,17 @@ public: } static std::string process_limit_exceeded_errmsg_str() { - return fmt::format( - "{} exceed limit {} or {} less than low water mark {}", - process_memory_used_details_str(), MemInfo::mem_limit_str(), - sys_mem_available_details_str(), - PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); + return fmt::format("{} exceed limit {} or {} less than low water mark {}", + process_memory_used_details_str(), MemInfo::mem_limit_str(), + sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), + TUnit::BYTES)); // only process memory print details } static std::string process_soft_limit_exceeded_errmsg_str() { return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.", process_memory_used_details_str(), MemInfo::soft_mem_limit_str(), - sys_mem_available_details_str(), + sys_mem_available_str(), PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)); } @@ -172,9 +172,12 @@ public: static std::mutex cache_adjust_capacity_lock; static std::condition_variable cache_adjust_capacity_cv; static std::atomic<bool> cache_adjust_capacity_notify; - static std::atomic<double> last_cache_capacity_adjust_weighted; - // This capacity is set by workload group spill disk thread - static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted; + // This capacity is set by memory maintenance thread `refresh_cache_capacity`, it is running periodicity, + // modified when process memory changes. + static std::atomic<double> last_periodic_refreshed_cache_capacity_adjust_weighted; + // This capacity is set by memory maintenance thread `handle_paused_queries`, in workload group mgr, + // modified when a query enters paused state due to process memory exceed. + static std::atomic<double> last_memory_exceeded_cache_capacity_adjust_weighted; // The value that take affect static std::atomic<double> last_affected_cache_capacity_adjust_weighted; static std::atomic<bool> any_workload_group_exceed_limit; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 01328ebaa84..c0a0e64db67 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -397,6 +397,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, const std::string& cancel_reason, RuntimeProfile* profile, Type type) { + // should skip Type::LOAD `Memtable#`. + // TODO, changed to iterate over all query's `ResourceContext` instead of iterating over `MemTracker`. return free_top_memory_query( min_free_mem, type, ExecEnv::GetInstance()->mem_tracker_limiter_pool, [&cancel_reason, &type](int64_t mem_consumption, const std::string& label) { @@ -479,6 +481,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( COUNTER_UPDATE(seek_tasks_counter, seek_num); COUNTER_UPDATE(previously_canceling_tasks_counter, canceling_task.size()); + // TODO, print resource_context->task_controller.debug_string() LOG(INFO) << log_prefix << "seek finished, seek " << seek_num << " tasks. among them, " << min_pq.size() << " tasks will be canceled, " << PrettyPrinter::print_bytes(prepare_free_mem) << " memory size prepare free; " @@ -494,6 +497,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( min_pq.pop(); continue; } + // TODO, use resource_context->task_controller.cancel() ExecEnv::GetInstance()->fragment_mgr()->cancel_query( cancelled_queryid, Status::MemoryLimitExceeded(cancel_msg( min_pq.top().first, min_pq.top().second))); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index bed0c2e80ac..41fbd89556a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -370,6 +370,11 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) { inline Status MemTrackerLimiter::check_limit(int64_t bytes) { // Do not enable check limit, because reserve process will check it. + // If reserve enabled, even if the reserved memory size is smaller than the actual requested memory, + // and the query memory consumption is larger than the limit, we do not expect the query to fail + // after `check_limit` returns an error, but to run as long as possible, + // and will enter the paused state and try to spill when the query reserves next time. + // If the workload group or process runs out of memory, it will be forced to cancel. if (bytes <= 0 || _enable_reserve_memory) { return Status::OK(); } diff --git a/be/src/runtime/memory/memory_profile.cpp b/be/src/runtime/memory/memory_profile.cpp index f9bed71071c..f3397a20b2c 100644 --- a/be/src/runtime/memory/memory_profile.cpp +++ b/be/src/runtime/memory/memory_profile.cpp @@ -264,6 +264,7 @@ void MemoryProfile::refresh_memory_overview_profile() { memory_untracked_memory_bytes << untracked_memory - memory_untracked_memory_bytes.get_value(); // 6 refresh additional tracker printed when memory exceeds limit. + // TODO, separate Framgnet and Memtable memory in Load memory. COUNTER_SET(_load_all_memtables_usage_counter, ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->consumption()); @@ -343,7 +344,7 @@ void MemoryProfile::print_log_process_usage() { if (_enable_print_log_process_usage) { _enable_print_log_process_usage = false; auto log_str = process_memory_detail_str(); - LOG_LONG_STRING(WARNING, log_str); + LOG_LONG_STRING(INFO, log_str); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index a8461e851ef..e65455cc6ca 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -82,7 +82,8 @@ public: void consume(int64_t size); void flush_untracked_mem(); - doris::Status try_reserve(int64_t size, bool only_check_process_memory); + // if only_check_process_memory == true, still reserve query, wg, process memory, only check process memory. + doris::Status try_reserve(int64_t size, bool only_check_process_memory = false); void shrink_reserved(); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 53aee7d1377..4bf563b4b90 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -174,6 +174,8 @@ inline thread_local bool use_mem_hook = false; // 4. ThreadMemTrackerMgr // // There may be other optional info to be added later. +// +// Note: Keep the class simple and only add properties. class ThreadContext { public: ThreadContext() { thread_mem_tracker_mgr = std::make_unique<ThreadMemTrackerMgr>(); } @@ -226,18 +228,6 @@ public: // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; - [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const { - return thread_mem_tracker_mgr->limiter_mem_tracker(); - } - - doris::Status try_reserve_process_memory(const int64_t size) const { - return thread_mem_tracker_mgr->try_reserve(size, true); - } - - doris::Status try_reserve_memory(const int64_t size) const { - return thread_mem_tracker_mgr->try_reserve(size, false); - } - int thread_local_handle_count = 0; private: @@ -429,22 +419,24 @@ public: // Basic macros for mem tracker, usually do not need to be modified and used. #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - do { \ - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ - doris::thread_context()->thread_mem_tracker()->label() != "Orphan") \ - << doris::memory_orphan_check_msg; \ - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \ - size, tracker); \ +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + do { \ + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() != \ + "Orphan") \ + << doris::memory_orphan_check_msg; \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \ + size, tracker); \ } while (0) -#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - do { \ - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ - doris::thread_context()->thread_mem_tracker()->label() != "Orphan") \ - << doris::memory_orphan_check_msg; \ - tracker->transfer_to( \ - size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \ +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ + do { \ + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() != \ + "Orphan") \ + << doris::memory_orphan_check_msg; \ + tracker->transfer_to( \ + size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \ } while (0) // Mem Hook to consume thread mem tracker diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 1e4a3b5469d..499d8914949 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -290,6 +290,15 @@ int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimePr _id, _name, _memory_limit, used_memory, need_free_mem, freed_mem); }}; + // the query being canceled is not counted in `freed_mem`, + // so `handle_paused_queries` may cancel more queries than expected. + // + // TODO, in `MemTrackerLimiter::free_xxx`, for the query being canceled, + // if (current time - cancel start time) < 2s (a config), the query memory is counted in `freed_mem`, + // and the query memory is expected to be released soon. + // if > 2s, the query memory will not be counted in `freed_mem`, + // and the query may be blocked during the cancel process. skip this query and continue to cancel other queries. + // 1. free top overcommit query RuntimeProfile* tmq_profile = profile->create_child( fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), true, true); @@ -337,6 +346,7 @@ int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimePr return freed_mem; } +// TODO, remove this function, replaced by free_overcommited_memory. int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { if (need_free_mem <= 0) { return 0; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 015e38e3ec9..66370111ec4 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -353,6 +353,10 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // here query is paused because of WORKLOAD_GROUP_MEMORY_EXCEEDED, + // wg of the current query may not actually exceed the limit, + // just (wg consumption + current query expected reserve memory > wg memory limit) + // // Only deal with non overcommit workload group. if (wg->enable_memory_overcommit()) { // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, @@ -368,9 +372,20 @@ void WorkloadGroupMgr::handle_paused_queries() { query_it = queries_list.erase(query_it); continue; } - // check if the reserve is too large, if it is too large, - // should set the query's limit only. - // Check the query's reserve with expected limit. + // if the current query memory consumption + expected reserve memory exceeds the limit, + // it may be that the expected reserve memory is too large, + // wg memory is insufficient at this time, + // so the current query should try to release memory by itself, + // but here we did not directly try to spill this query, + // set the query's limit only, and then wake up the current query to continue execution. + // + // if the expected reserve memory estimate is correct, high probability, + // query will enter the pause state again, the reason is expected to be QUERY_MEMORY_EXCEEDED, + // and handle_single_query_ will be called to spill. + // + // Of course, if the actual required memory is less than the reserved memory, + // or if there is enough memory when continuing to execute, + // it will run successfully without spilling. if (query_ctx->adjusted_mem_limit() < query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit()); @@ -385,8 +400,7 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } if (flushed_memtable_bytes <= 0) { - flushed_memtable_bytes = - flush_memtable_from_current_group_(wg, query_it->reserve_size_); + flushed_memtable_bytes = flush_memtable_from_current_group_(wg); } if (flushed_memtable_bytes > 0) { // Flushed some memtable, just wait flush finished and not do anything more. @@ -394,6 +408,18 @@ void WorkloadGroupMgr::handle_paused_queries() { ++query_it; continue; } + + // when running here, current query adjusted_mem_limit < query memory consumption + reserve_size, + // which means that the current query itself has not exceeded the memory limit. + // + // this means that there must be queries in the wg of the current query whose memory exceeds + // adjusted_mem_limit, but these queries may not have entered the paused state, + // so these queries may not modify the mem limit and continue to execute + // when (adjusted_mem_limit < consumption + reserve_size_) is judged above. + // + // so call `update_queries_limit_` to force the update of the mem_limit of all queries + // in the wg of the current query to the adjusted_mem_limit, + // hoping that these queries that exceed limit will release memory. if (!has_changed_hard_limit) { update_queries_limit_(wg, true); has_changed_hard_limit = true; @@ -410,9 +436,13 @@ void WorkloadGroupMgr::handle_paused_queries() { << ", wg: " << wg->debug_string(); } if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { + // we not encourage not enable slot memory. + // // If not enable slot memory policy, then should spill directly - // Maybe there are another query that use too much memory, but we - // not encourage not enable slot memory. + // Maybe there are another query that use too much memory, if these queries + // exceed the memory limit, they will enter the paused state + // due to `QUERY_MEMORY_EXCEEDED` and will also try to spill. + // // TODO should kill the query that exceed limit. bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, query_it->elapsed_time(), @@ -430,7 +460,7 @@ void WorkloadGroupMgr::handle_paused_queries() { // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, // and then set wg's flag, other query may not free memory very quickly. if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms) { - // set wg's memory to insufficent, then add it back to task scheduler to run. + // set wg's memory to sufficient, then add it back to task scheduler to run. LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " will be resume."; query_ctx->set_memory_sufficient(true); @@ -445,35 +475,58 @@ void WorkloadGroupMgr::handle_paused_queries() { has_query_exceed_process_memlimit = true; // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata // used too much memory. Should clean all cache here. - // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + // + // here query is paused because of PROCESS_MEMORY_EXCEEDED, + // normally, before process memory exceeds, daemon thread `refresh_cache_capacity` will + // adjust the cache capacity to 0. + // but at this time, process may not actually exceed the limit, + // just (process memory + current query expected reserve memory > process memory limit) + // so the behavior at this time is the same as the process memory limit exceed, clear all cache. if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > 0.05 && - doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > - 0.05) { - doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = - 0.04; + doris::GlobalMemoryArbitrator:: + last_memory_exceeded_cache_capacity_adjust_weighted > 0.05) { + doris::GlobalMemoryArbitrator:: + last_memory_exceeded_cache_capacity_adjust_weighted = 0.04; doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); LOG(INFO) << "There are some queries need process memory, so that set cache " - "capacity " - "to 0 now"; + "capacity to 0 now"; } + + // `cache_ratio_ < 0.05` means that the cache has been cleared + // before the query enters the paused state. + // but the query is still paused because of process memory exceed, + // so here we will try to continue to release other memory. + // // need to check config::disable_memory_gc here, if not, when config::disable_memory_gc == true, // cache is not adjusted, query_it->cache_ratio_ will always be 1, and this if branch will nenver - // execute, this query will never be resumed, and will deadlock here + // execute, this query will never be resumed, and will deadlock here. if ((!config::disable_memory_gc && query_it->cache_ratio_ < 0.05) || config::disable_memory_gc) { // 1. Check if could revoke some memory from memtable if (flushed_memtable_bytes <= 0) { - flushed_memtable_bytes = - flush_memtable_from_current_group_(wg, query_it->reserve_size_); + // if the process memory has exceeded the limit, it is expected that + // `MemTableMemoryLimiter` will flush most of the memtable. + // but if the process memory is not exceeded, and the current query expected reserve memory + // to be too large, the other parts of the process cannot perceive the reserve memory size, + // so it is expected to flush memtable in `handle_paused_queries`. + flushed_memtable_bytes = flush_memtable_from_current_group_(wg); } if (flushed_memtable_bytes > 0) { // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); ++query_it; continue; } // TODO should wait here to check if the process has release revoked_size memory and then continue. if (!has_revoked_from_other_group) { + // `need_free_mem` is equal to the `reserve_size_` of the first query + // that `handle_paused_queries` reaches here this time. + // this means that at least `reserve_size_` memory is released from other wgs. + // the released memory at least allows the current query to execute, + // but we will wake up all queries after this `handle_paused_queries`, + // even if the released memory is not enough for all queries to execute, + // but this can simplify the behavior and omit the query priority. int64_t revoked_size = revoke_memory_from_other_group_( query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); if (revoked_size > 0) { @@ -508,6 +561,12 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } } + // `cache_ratio_ > 0.05` means that the cache has not been cleared + // when the query enters the paused state. + // `last_affected_cache_capacity_adjust_weighted < 0.05` means that + // the cache has been cleared at this time. + // this means that the cache has been cleaned after the query enters the paused state. + // assuming that some memory has been released, wake up the query to continue execution. if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < 0.05 && query_it->cache_ratio_ > 0.05) { @@ -521,6 +580,9 @@ void WorkloadGroupMgr::handle_paused_queries() { } } + // even if wg has no query in the paused state, the following code will still be executed + // because `handle_paused_queries` adds a <wg, empty set> to `_paused_queries_list` at the beginning. + bool is_low_watermark = false; bool is_high_watermark = false; wg->check_mem_used(&is_low_watermark, &is_high_watermark); @@ -538,15 +600,20 @@ void WorkloadGroupMgr::handle_paused_queries() { } } - if (has_query_exceed_process_memlimit) { - // No query failed due to process exceed limit, so that enable cache now. - doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 1; + if (!has_query_exceed_process_memlimit) { + // No query paused due to process exceed limit, so that enable cache now. + doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted = + doris::GlobalMemoryArbitrator:: + last_periodic_refreshed_cache_capacity_adjust_weighted.load( + std::memory_order_relaxed); + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "No query was paused due to insufficient process memory, so that set cache " + "capacity to last_periodic_refreshed_cache_capacity_adjust_weighted now"; } } -// Return the expected free bytes if memtable could flush -int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg, - int64_t need_free_mem) { +// Return the expected free bytes if wg's memtable memory is greater than Max. +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg) { // If there are a lot of memtable memory, then wait them flush finished. MemTableMemoryLimiter* memtable_limiter = doris::ExecEnv::GetInstance()->memtable_memory_limiter(); @@ -556,15 +623,13 @@ int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr"; memtable_limiter->get_workload_group_memtable_usage( wg->id(), &memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); - // TODO: should add a signal in memtable limiter to prevent new batch - // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. - // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value - // should be larged, if the group is optimized for query, then the value should be smaller. int64_t max_wg_memtable_bytes = wg->write_buffer_limit(); if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > max_wg_memtable_bytes) { + auto max_wg_active_memtable_bytes = + (int64_t)(max_wg_memtable_bytes * config::load_max_wg_active_memtable_percent); // There are many table in flush queue, just waiting them flush finished. - if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { + if (memtable_active_bytes < max_wg_active_memtable_bytes) { LOG_EVERY_T(INFO, 60) << wg->name() << " load memtable size is: " << memtable_active_bytes << ", " << memtable_queue_bytes << ", " << memtable_flush_bytes @@ -574,13 +639,13 @@ int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg } else { // Flush some memtables(currently written) to flush queue. memtable_limiter->flush_workload_group_memtables( - wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); + wg->id(), memtable_active_bytes - max_wg_active_memtable_bytes); LOG_EVERY_T(INFO, 60) << wg->name() << " load memtable size is: " << memtable_active_bytes << ", " << memtable_queue_bytes << ", " << memtable_flush_bytes << ", flush some active memtable to revoke memory"; return memtable_queue_bytes + memtable_flush_bytes + memtable_active_bytes - - (int64_t)(max_wg_memtable_bytes * 0.6); + max_wg_active_memtable_bytes; } } return 0; @@ -598,19 +663,21 @@ int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryC if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { return total_freed_mem; } - if (hard_limit) { - freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, - doris::QUERY_MIN_MEMORY, profile.get()); - } else { - freed_mem = cancel_top_query_in_overcommit_group_( - need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), - profile.get()); - } - total_freed_mem += freed_mem; - // The revoke process may kill current requestor, so should return now. - if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { - return total_freed_mem; - } + // TODO, remove cancel_top_query_in_overcommit_group_, in `revoke_overcommited_memory_` already include + // "cancel top query". + // if (hard_limit) { + // freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, + // doris::QUERY_MIN_MEMORY, profile.get()); + // } else { + // freed_mem = cancel_top_query_in_overcommit_group_( + // need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), + // profile.get()); + // } + // total_freed_mem += freed_mem; + // // The revoke process may kill current requestor, so should return now. + // if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + // return total_freed_mem; + // } return total_freed_mem; } @@ -655,6 +722,7 @@ int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryConte // If the memtable is too large, then flush them and wait for finished. int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, RuntimeProfile* profile) { + // TODO, reuse `flush_memtable_from_current_group_`. return 0; } @@ -703,8 +771,11 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue >= config::spill_in_paused_queue_timeout_ms) { - // if cannot find any memory to release, then let the query continue to run as far as possible - // or cancelled by gc if memory is really not enough. + // if cannot find any memory to release, then let the query continue to run as far as possible. + // after `disable_reserve_memory`, the query will not enter the paused state again, + // if the memory is really insufficient, Allocator will throw an exception + // of query memory limit exceed and the query will be canceled, + // or it will be canceled by memory gc when the process memory exceeds the limit. auto log_str = fmt::format( "Query {} memory limit is exceeded, but could " "not find memory that could release or spill to disk, disable reserve " @@ -853,7 +924,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha } } // calculate per query weighted memory limit - debug_msg = "Query Memory Summary: \n"; + debug_msg += "\nQuery Memory Summary: \n"; for (const auto& query : all_query_ctxs) { auto query_ctx = query.second.lock(); if (!query_ctx) { @@ -866,6 +937,9 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) { + // TODO, `Policy::FIXED` expects `all_query_used_slot_count < wg_total_slot_count`, + // which is controlled when query is submitted + // DCEHCK(total_used_slot_count <= total_slot_count); if (total_slot_count < 1) { LOG(WARNING) << "Query " << print_id(query_ctx->query_id()) @@ -881,6 +955,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha // If low water mark is not reached, then use process memory limit as query memory limit. // It means it will not take effect. // If there are some query in paused list, then limit should take effect. + // numerator `+ total_used_slot_count` ensures that the result is greater than 1. expected_query_weighted_mem_limit = total_used_slot_count > 0 ? (int64_t)((wg_high_water_mark_except_load + total_used_slot_count) * diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index ab4103fe251..9f9a679aa81 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -109,7 +109,7 @@ private: int64_t cancel_top_query_in_overcommit_group_(int64_t need_free_mem, int64_t lower_bound, RuntimeProfile* profile); - int64_t flush_memtable_from_current_group_(WorkloadGroupPtr wg, int64_t need_free_mem); + int64_t flush_memtable_from_current_group_(WorkloadGroupPtr wg); bool handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, size_t size_to_reserve, int64_t time_in_queue, Status paused_reason); int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 045502631fd..57394f65c1b 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -97,7 +97,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem ->thread_mem_tracker_mgr->limiter_mem_tracker() ->consumption()), doris::PrettyPrinter::print_bytes( - doris::thread_context()->thread_mem_tracker()->reserved_consumption()), + doris::thread_context() + ->thread_mem_tracker_mgr->limiter_mem_tracker() + ->reserved_consumption()), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index aae746bcc6c..dfbecd06634 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -281,8 +281,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && !(ctx->low_memory_mode() && has_first_full_block) && - !(has_first_full_block && - doris::thread_context()->thread_mem_tracker()->limit_exceeded())) { + !(has_first_full_block && doris::thread_context() + ->thread_mem_tracker_mgr->limiter_mem_tracker() + ->limit_exceeded())) { if (UNLIKELY(ctx->done())) { eos = true; break; @@ -298,7 +299,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, } else { if (state->get_query_ctx()->enable_reserve_memory()) { size_t block_avg_bytes = scanner->get_block_avg_bytes(); - auto st = thread_context()->try_reserve_memory(block_avg_bytes); + auto st = thread_context()->thread_mem_tracker_mgr->try_reserve( + block_avg_bytes); if (!st.ok()) { handle_reserve_memory_failure(state, ctx, st, block_avg_bytes); break; diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index 421a09d4985..eb03d1874f8 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -250,7 +250,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { thread_context->thread_mem_tracker_mgr->consume(size2); EXPECT_EQ(t->consumption(), size1 + size2); - auto st = thread_context->try_reserve_memory(size3); + auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -289,7 +289,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); - st = thread_context->try_reserve_memory(size3); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -355,7 +355,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); - st = thread_context->try_reserve_memory(size2); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size2); EXPECT_TRUE(st.ok()) << st.to_string(); // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2 // ThreadMemTrackerMgr _untracked_mem = 0 @@ -363,9 +363,9 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); // size3 - size2 + size2 - st = thread_context->try_reserve_memory(size3); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); - st = thread_context->try_reserve_memory(size3); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); thread_context->thread_mem_tracker_mgr->consume(size3); thread_context->thread_mem_tracker_mgr->consume(size2); @@ -403,14 +403,14 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { int64_t size3 = size2 * 2; thread_context->attach_task(rc); - auto st = thread_context->try_reserve_memory(size3); + auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); thread_context->thread_mem_tracker_mgr->consume(size2); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); - st = thread_context->try_reserve_memory(size3); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3); @@ -422,7 +422,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); - st = thread_context->try_reserve_memory(size3); + st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3 + size2); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org