xinyiZzz commented on code in PR #47462: URL: https://github.com/apache/doris/pull/47462#discussion_r2003101471
########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -287,6 +256,624 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size, const Status& status) { + DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + bool has_query_exceed_process_memlimit = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { Review Comment: 不会,disable 后就不会再进到paused list里了,所以不会死等,如果内存不足就被 allocator或gc cancel。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org