This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4b7eafb007102aa8cab1b66a61c590c5d36c77f7 Author: yiguolei <676222...@qq.com> AuthorDate: Wed Oct 30 19:23:44 2024 +0800 Should block memtable only when buffer limit reached (#42927) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/runtime/query_context.cpp | 3 +- be/src/runtime/query_context.h | 5 +- be/src/runtime/workload_group/workload_group.cpp | 23 +++++--- be/src/runtime/workload_group/workload_group.h | 7 +++ .../workload_group/workload_group_manager.cpp | 63 +++++++++++++++------- .../resource/workloadgroup/WorkloadGroup.java | 43 ++++++++++++++- .../resource/workloadgroup/WorkloadGroupTest.java | 20 +++++++ gensrc/thrift/BackendService.thrift | 7 +++ 8 files changed, 140 insertions(+), 31 deletions(-) diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 84b4cbc182d..753a5fc5ef0 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -148,6 +148,7 @@ void QueryContext::_init_query_mem_tracker() { } query_mem_tracker->set_overcommit(enable_mem_overcommit()); _user_set_mem_limit = bytes_limit; + _expected_mem_limit = _user_set_mem_limit; } QueryContext::~QueryContext() { @@ -386,7 +387,7 @@ void QueryContext::add_fragment_profile( #endif std::lock_guard<std::mutex> l(_profile_mutex); - LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", + LOG_INFO("Add fragment profile, query {}, fragment {}, pipeline profile count {} ", print_id(this->_query_id), fragment_id, pipeline_profiles.size()); _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles)); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 04c32535100..c7143c339eb 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -235,9 +235,7 @@ public: // This method is called by workload group manager to set query's memlimit using slot // If user set query limit explicitly, then should use less one - void set_mem_limit(int64_t new_mem_limit) { - query_mem_tracker->set_limit(std::min<int64_t>(new_mem_limit, _user_set_mem_limit)); - } + void set_mem_limit(int64_t new_mem_limit) { query_mem_tracker->set_limit(new_mem_limit); } int64_t get_mem_limit() const { return query_mem_tracker->limit(); } @@ -245,6 +243,7 @@ public: _expected_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit); } + // Expected mem limit is the limit when workload group reached limit. int64_t expected_mem_limit() { return _expected_mem_limit; } std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 1ba4a1c1c35..ca34c6bf1b6 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -47,8 +47,8 @@ namespace doris { const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; -const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; -const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; +const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75; +const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90; // This is a invalid value, and should ignore this value during usage const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0; const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20; @@ -70,7 +70,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _spill_high_watermark(tg_info.spill_high_watermark), _scan_bytes_per_second(tg_info.read_bytes_per_second), _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second), - _total_query_slot_count(tg_info.total_query_slot_count) { + _total_query_slot_count(tg_info.total_query_slot_count), + _slot_mem_policy(tg_info.slot_mem_policy) { std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { _scan_io_throttle_map[data_dir.path] = @@ -93,8 +94,8 @@ std::string WorkloadGroup::debug_string() const { auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 1); return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, " - "total_query_slot_count={}, " - "memory_limit = {}, write_buffer_ratio= {}%, " + "total_query_slot_count = {}, " + "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}%, " "enable_memory_overcommit = {}, total_mem_used = {}," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " "{}, spill_high_watermark = {},cpu_hard_limit = {}, scan_thread_num = " @@ -102,8 +103,8 @@ std::string WorkloadGroup::debug_string() const { "is_shutdown={}, query_num={}, " "read_bytes_per_second={}, remote_read_bytes_per_second={}]", _id, _name, _version, cpu_share(), _total_query_slot_count, - PrettyPrinter::print(_memory_limit, TUnit::BYTES), _load_buffer_ratio, - _enable_memory_overcommit ? "true" : "false", + PrettyPrinter::print(_memory_limit, TUnit::BYTES), to_string(_slot_mem_policy), + _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false", PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), mem_used_ratio, _spill_low_watermark, _spill_high_watermark, cpu_hard_limit(), @@ -176,6 +177,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { _remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second; _total_query_slot_count = tg_info.total_query_slot_count; _load_buffer_ratio = tg_info.write_buffer_ratio; + _slot_mem_policy = tg_info.slot_mem_policy; } else { return; } @@ -551,6 +553,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( write_buffer_ratio = tworkload_group_info.write_buffer_ratio; } + // 18 slot memory policy + TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::DISABLED; + if (tworkload_group_info.__isset.slot_memory_policy) { + slot_mem_policy = tworkload_group_info.slot_memory_policy; + } + return {.id = tg_id, .name = name, .cpu_share = cpu_share, @@ -567,6 +575,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( .read_bytes_per_second = read_bytes_per_second, .remote_read_bytes_per_second = remote_read_bytes_per_second, .total_query_slot_count = total_query_slot_count, + .slot_mem_policy = slot_mem_policy, .write_buffer_ratio = write_buffer_ratio}; } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 1d1a65547e8..6e2fa426cf6 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -29,6 +29,7 @@ #include <string> #include <unordered_set> +#include "common/factory_creator.h" #include "common/status.h" #include "service/backend_options.h" #include "util/hash_util.hpp" @@ -55,6 +56,8 @@ class WorkloadGroup; struct WorkloadGroupInfo; struct TrackerLimiterGroup; class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> { + ENABLE_FACTORY_CREATOR(WorkloadGroup); + public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); @@ -136,6 +139,8 @@ public: return _memory_limit > 0; } + TWgSlotMemoryPolicy::type slot_memory_policy() const { return _slot_mem_policy; } + bool exceed_limit() { std::shared_lock<std::shared_mutex> r_lock(_mutex); return _memory_limit > 0 ? _total_mem_used > _memory_limit : false; @@ -240,6 +245,7 @@ private: std::atomic<int64_t> _scan_bytes_per_second {-1}; std::atomic<int64_t> _remote_scan_bytes_per_second {-1}; std::atomic<int> _total_query_slot_count = 0; + std::atomic<TWgSlotMemoryPolicy::type> _slot_mem_policy {TWgSlotMemoryPolicy::DISABLED}; // means workload group is mark dropped // new query can not submit @@ -284,6 +290,7 @@ struct WorkloadGroupInfo { const int read_bytes_per_second = -1; const int remote_read_bytes_per_second = -1; const int total_query_slot_count = 0; + const TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::DISABLED; const int write_buffer_ratio = 0; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 3d820293694..853f3740551 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -279,7 +279,7 @@ void WorkloadGroupMgr::handle_paused_queries() { } } } - const int64_t TIMEOUT_IN_QUEUE = 1000L * 10; + const int64_t TIMEOUT_IN_QUEUE = 1000L * 3; std::unique_lock<std::mutex> lock(_paused_queries_lock); bool has_revoked_from_other_group = false; for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { @@ -372,17 +372,34 @@ void WorkloadGroupMgr::handle_paused_queries() { "so that other query will reduce their memory. wg: " << wg->debug_string(); } - // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, - // and then set wg's flag, other query may not free memory very quickly. - if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { - // set wg's memory to insufficent, then add it back to task scheduler to run. - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " will be resume."; - query_ctx->set_memory_sufficient(true); - query_it = queries_list.erase(query_it); - continue; + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) { + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, but we + // not encourage not enable slot memory. + // TODO should kill the query that exceed limit. + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } } else { - ++query_it; - continue; + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { + // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << " will be resume."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } } } else { // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata @@ -557,6 +574,10 @@ int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryConte { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { + if (requestor->workload_group() != nullptr && + iter->second->id() == requestor->workload_group()->id()) { + continue; + } heap.emplace(iter->second, iter->second->memory_used()); } } @@ -620,7 +641,8 @@ bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ "query({}) reserve memory failed, but could not find memory that " "could " "release or spill to disk(memory usage:{}, limit: {})", - query_id, PrettyPrinter::print_bytes(memory_usage), PrettyPrinter::print_bytes(query_ctx->get_mem_limit()))); + query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(query_ctx->get_mem_limit()))); } } else { if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { @@ -678,14 +700,15 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha (double)(wg->total_mem_used()) / wg_mem_limit); } - // If the wg enable over commit memory, then it is no need to update query memlimit - if (wg->enable_memory_overcommit()) { - return; - } - // If reached low watermark then enable load buffer limit - if (is_low_wartermark) { + // If reached low watermark and wg is not enable memory overcommit, then enable load buffer limit + if (is_low_wartermark && !wg->enable_memory_overcommit()) { wg->enable_write_buffer_limit(true); } + // Both enable overcommit and not enable overcommit, if user set slot memory policy + // then we will replace the memtracker's memlimit with + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) { + return; + } int32_t total_used_slot_count = 0; int32_t total_slot_count = wg->total_query_slot_count(); // calculate total used slot count @@ -709,7 +732,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha int64_t query_weighted_mem_limit = 0; int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit - if (!query_ctx->enable_mem_overcommit()) { + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) { if (total_slot_count < 1) { LOG(WARNING) << "query " << print_id(query_ctx->query_id()) @@ -740,6 +763,8 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha // If the query is a pure load task, then should not modify its limit. Or it will reserve // memory failed and we did not hanle it. if (!query_ctx->is_pure_load_task()) { + // If slot memory policy is enabled, then overcommit is disabled. + query_ctx->get_mem_tracker()->set_overcommit(false); query_ctx->set_mem_limit(query_weighted_mem_limit); query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 63e454e6dbd..c2a4e97e9c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -29,6 +29,7 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TWgSlotMemoryPolicy; import org.apache.doris.thrift.TWorkloadGroupInfo; import org.apache.doris.thrift.TopicInfo; @@ -43,6 +44,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -75,6 +77,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark"; + public static final String SLOT_MEMORY_POLICY = "slot_memory_policy"; + public static final String TAG = "tag"; public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second"; @@ -90,11 +94,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND) - .add(WRITE_BUFFER_RATIO).build(); + .add(WRITE_BUFFER_RATIO).add(SLOT_MEMORY_POLICY).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90; public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20; + public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "disabled"; + public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES = new HashSet<String>() {{ + add("disabled"); + add("fixed"); + add("dynamic"); + }}; @SerializedName(value = "id") private long id; @@ -141,6 +151,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.properties.put(WRITE_BUFFER_RATIO, WRITE_BUFFER_RATIO_DEFAULT_VALUE + ""); } + if (properties.containsKey(SLOT_MEMORY_POLICY)) { + String slotPolicy = properties.get(SLOT_MEMORY_POLICY); + this.properties.put(SLOT_MEMORY_POLICY, slotPolicy); + } else { + this.properties.put(SLOT_MEMORY_POLICY, SLOT_MEMORY_POLICY_DEFAULT_VALUE); + } + if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase()); } @@ -298,6 +315,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + if (properties.containsKey(SLOT_MEMORY_POLICY)) { + String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase(); + if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) { + throw new DdlException("The value of '" + SLOT_MEMORY_POLICY + + "' must be one of disabled, fixed, dynamic."); + } + } + if (properties.containsKey(SCAN_THREAD_NUM)) { String value = properties.get(SCAN_THREAD_NUM); try { @@ -589,6 +614,18 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { return new TPipelineWorkloadGroup().setId(id); } + public static TWgSlotMemoryPolicy findSlotPolicyValueByString(String slotPolicy) { + if (slotPolicy.equalsIgnoreCase("disabled")) { + return TWgSlotMemoryPolicy.DISABLED; + } else if (slotPolicy.equalsIgnoreCase("fixed")) { + return TWgSlotMemoryPolicy.FIXED; + } else if (slotPolicy.equalsIgnoreCase("dynamic")) { + return TWgSlotMemoryPolicy.DYNAMIC; + } else { + throw new RuntimeException("Could not find policy using " + slotPolicy); + } + } + public TopicInfo toTopicInfo() { TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo(); tWorkloadGroupInfo.setId(id); @@ -613,6 +650,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (writeBufferRatioStr != null) { tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr)); } + String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY); + if (slotMemoryPolicyStr != null) { + tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr)); + } String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT); if (memOvercommitStr != null) { tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java index f99fd4f3526..872a3d17b41 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java @@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadgroup; import org.apache.doris.common.DdlException; import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.thrift.TWgSlotMemoryPolicy; import com.google.common.collect.Maps; import org.junit.Assert; @@ -87,4 +88,23 @@ public class WorkloadGroupTest { List<List<String>> rows = result.getRows(); Assert.assertEquals(1, rows.size()); } + + @Test + public void testPolicyToString() { + TWgSlotMemoryPolicy p1 = WorkloadGroup.findSlotPolicyValueByString("fixed"); + Assert.assertEquals(p1, TWgSlotMemoryPolicy.FIXED); + TWgSlotMemoryPolicy p2 = WorkloadGroup.findSlotPolicyValueByString("dynamic"); + Assert.assertEquals(p2, TWgSlotMemoryPolicy.DYNAMIC); + TWgSlotMemoryPolicy p3 = WorkloadGroup.findSlotPolicyValueByString("disabled"); + Assert.assertEquals(p3, TWgSlotMemoryPolicy.DISABLED); + TWgSlotMemoryPolicy p4 = WorkloadGroup.findSlotPolicyValueByString("disableD"); + Assert.assertEquals(p4, TWgSlotMemoryPolicy.DISABLED); + boolean hasException = false; + try { + WorkloadGroup.findSlotPolicyValueByString("disableDa"); + } catch (RuntimeException e) { + hasException = true; + } + Assert.assertEquals(hasException, true); + } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 6b41cbc14b7..c016d9ba5c8 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -253,6 +253,12 @@ enum TTopicInfoType { WORKLOAD_SCHED_POLICY = 2 } +enum TWgSlotMemoryPolicy { + DISABLED = 0, + FIXED = 1, + DYNAMIC = 2 +} + struct TWorkloadGroupInfo { 1: optional i64 id 2: optional string name @@ -272,6 +278,7 @@ struct TWorkloadGroupInfo { 16: optional string tag 17: optional i32 total_query_slot_count 18: optional i32 write_buffer_ratio + 19: optional TWgSlotMemoryPolicy slot_memory_policy } enum TWorkloadMetricType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org