This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 974bbfc458e62ad93708226c77fa2580d3bfc537 Author: yiguolei <676222...@qq.com> AuthorDate: Sun Oct 27 21:00:35 2024 +0800 rename to write buffer limit (#42511) f Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable_memory_limiter.cpp | 2 +- be/src/runtime/workload_group/workload_group.cpp | 14 +++++----- be/src/runtime/workload_group/workload_group.h | 10 +++---- .../workload_group/workload_group_manager.cpp | 14 +++++----- .../resource/workloadgroup/WorkloadGroup.java | 32 +++++++++++----------- gensrc/thrift/BackendService.thrift | 2 +- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 2071f814768..fe483706127 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -118,7 +118,7 @@ void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt // Should releae memory quickly. using namespace std::chrono_literals; int32_t sleep_times = 10; - while (wg != nullptr && wg->enable_load_buffer_limit() && sleep_times > 0) { + while (wg != nullptr && wg->enable_write_buffer_limit() && sleep_times > 0) { std::this_thread::sleep_for(100ms); --sleep_times; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 2cc232e97df..888cd256332 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -58,7 +58,7 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _name(tg_info.name), _version(tg_info.version), _memory_limit(tg_info.memory_limit), - _load_buffer_ratio(tg_info.load_buffer_ratio), + _load_buffer_ratio(tg_info.write_buffer_ratio), _enable_memory_overcommit(tg_info.enable_memory_overcommit), _cpu_share(tg_info.cpu_share), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), @@ -94,7 +94,7 @@ std::string WorkloadGroup::debug_string() const { return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, " "total_query_slot_count={}, " - "memory_limit = {}, load_buffer_ratio= {}%" + "memory_limit = {}, write_buffer_ratio= {}%" "enable_memory_overcommit = {}, total_mem_used = {}," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " "{}, spill_high_watermark = {},cpu_hard_limit = {}, scan_thread_num = " @@ -175,7 +175,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { _scan_bytes_per_second = tg_info.read_bytes_per_second; _remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second; _total_query_slot_count = tg_info.total_query_slot_count; - _load_buffer_ratio = tg_info.load_buffer_ratio; + _load_buffer_ratio = tg_info.write_buffer_ratio; } else { return; } @@ -547,9 +547,9 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( } // 17 load buffer memory limit - int load_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE; - if (tworkload_group_info.__isset.load_buffer_ratio) { - load_buffer_ratio = tworkload_group_info.load_buffer_ratio; + int write_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE; + if (tworkload_group_info.__isset.write_buffer_ratio) { + write_buffer_ratio = tworkload_group_info.write_buffer_ratio; } return {.id = tg_id, @@ -568,7 +568,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( .read_bytes_per_second = read_bytes_per_second, .remote_read_bytes_per_second = remote_read_bytes_per_second, .total_query_slot_count = total_query_slot_count, - .load_buffer_ratio = load_buffer_ratio}; + .write_buffer_ratio = write_buffer_ratio}; } void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index c88f1e3604b..fd51d96c6c9 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -82,9 +82,9 @@ public: int64_t load_mem_used() const { return _load_buffer_size; } - void enable_load_buffer_limit(bool enable_limit) { _enable_load_buffer_limit = enable_limit; } + void enable_write_buffer_limit(bool enable_limit) { _enable_write_buffer_limit = enable_limit; } - bool enable_load_buffer_limit() const { return _enable_load_buffer_limit; } + bool enable_write_buffer_limit() const { return _enable_write_buffer_limit; } // make memory snapshots and refresh total memory used at the same time. int64_t refresh_memory_usage(); @@ -208,7 +208,7 @@ public: return _memtable_flush_pool.get(); } - int64_t load_buffer_limit() const { return _memory_limit * _load_buffer_ratio / 100; } + int64_t write_buffer_limit() const { return _memory_limit * _load_buffer_ratio / 100; } int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile); @@ -222,7 +222,7 @@ private: // If the wg's memory reached high water mark, then the load buffer // will be restricted to this limit. int64_t _load_buffer_ratio = 0; - std::atomic<bool> _enable_load_buffer_limit = false; + std::atomic<bool> _enable_write_buffer_limit = false; std::atomic_int64_t _total_mem_used = 0; // bytes std::atomic_int64_t _load_buffer_size = 0; @@ -283,7 +283,7 @@ struct WorkloadGroupInfo { const int read_bytes_per_second = -1; const int remote_read_bytes_per_second = -1; const int total_query_slot_count = 0; - const int load_buffer_ratio = 0; + const int write_buffer_ratio = 0; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 784c09555c4..25c983b98f3 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -350,12 +350,12 @@ void WorkloadGroupMgr::handle_paused_queries() { } if (flushed_memtable_bytes > 0) { // Flushed some memtable, just wait flush finished and not do anything more. - wg->enable_load_buffer_limit(true); + wg->enable_write_buffer_limit(true); ++query_it; continue; } else { // If could not revoke memory by flush memtable, then disable load buffer limit - wg->enable_load_buffer_limit(false); + wg->enable_write_buffer_limit(false); } if (!has_changed_hard_limit) { update_queries_limit_(wg, true); @@ -447,7 +447,7 @@ void WorkloadGroupMgr::handle_paused_queries() { } // Not need waiting flush memtable and below low watermark disable load buffer limit if (flushed_memtable_bytes <= 0 && !is_low_wartermark) { - wg->enable_load_buffer_limit(false); + wg->enable_write_buffer_limit(false); } if (queries_list.empty()) { @@ -475,7 +475,7 @@ int64_t WorkloadGroupMgr::flush_memtable_from_current_group_( // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value // should be larged, if the group is optimized for query, then the value should be smaller. - int64_t max_wg_memtable_bytes = wg->load_buffer_limit(); + int64_t max_wg_memtable_bytes = wg->write_buffer_limit(); if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > max_wg_memtable_bytes) { // There are many table in flush queue, just waiting them flush finished. @@ -655,8 +655,8 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 1.0 / 100); int64_t memtable_usage = wg->load_mem_used(); int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit; - if (memtable_usage > wg->load_buffer_limit()) { - wg_high_water_mark_except_load = wg_high_water_mark_limit - wg->load_buffer_limit(); + if (memtable_usage > wg->write_buffer_limit()) { + wg_high_water_mark_except_load = wg_high_water_mark_limit - wg->write_buffer_limit(); } else { wg_high_water_mark_except_load = wg_high_water_mark_limit - memtable_usage - 10 * 1024 * 1024; @@ -679,7 +679,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha } // If reached low watermark then enable load buffer limit if (is_low_wartermark) { - wg->enable_load_buffer_limit(true); + wg->enable_write_buffer_limit(true); } int32_t total_used_slot_count = 0; int32_t total_slot_count = wg->total_query_slot_count(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index b428a058277..63e454e6dbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -57,7 +57,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; - public static final String LOAD_BUFFER_RATIO = "load_buffer_ratio"; + public static final String WRITE_BUFFER_RATIO = "write_buffer_ratio"; public static final String MAX_CONCURRENCY = "max_concurrency"; @@ -90,11 +90,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND) - .add(LOAD_BUFFER_RATIO).build(); + .add(WRITE_BUFFER_RATIO).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90; - public static final int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20; + public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20; @SerializedName(value = "id") private long id; @@ -131,14 +131,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { memoryLimitString.substring(0, memoryLimitString.length() - 1)); } - if (properties.containsKey(LOAD_BUFFER_RATIO)) { - String loadBufLimitStr = properties.get(LOAD_BUFFER_RATIO); + if (properties.containsKey(WRITE_BUFFER_RATIO)) { + String loadBufLimitStr = properties.get(WRITE_BUFFER_RATIO); if (loadBufLimitStr.endsWith("%")) { loadBufLimitStr = loadBufLimitStr.substring(0, loadBufLimitStr.length() - 1); } - this.properties.put(LOAD_BUFFER_RATIO, loadBufLimitStr); + this.properties.put(WRITE_BUFFER_RATIO, loadBufLimitStr); } else { - this.properties.put(LOAD_BUFFER_RATIO, LOAD_BUFFER_RATIO_DEFAULT_VALUE + ""); + this.properties.put(WRITE_BUFFER_RATIO, WRITE_BUFFER_RATIO_DEFAULT_VALUE + ""); } if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { @@ -271,13 +271,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } - if (properties.containsKey(LOAD_BUFFER_RATIO)) { - String memoryLimit = properties.get(LOAD_BUFFER_RATIO); + if (properties.containsKey(WRITE_BUFFER_RATIO)) { + String memoryLimit = properties.get(WRITE_BUFFER_RATIO); if (!memoryLimit.endsWith("%")) { - throw new DdlException(LOAD_BUFFER_RATIO + " " + memoryLimit + throw new DdlException(WRITE_BUFFER_RATIO + " " + memoryLimit + " requires a percentage and ends with a '%'"); } - String memLimitErr = LOAD_BUFFER_RATIO + " " + memoryLimit + String memLimitErr = WRITE_BUFFER_RATIO + " " + memoryLimit + " requires a positive int number."; try { if (Integer.parseInt(memoryLimit.substring(0, memoryLimit.length() - 1)) < 0) { @@ -517,11 +517,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add("-1"); } else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) { row.add("0%"); - } else if (LOAD_BUFFER_RATIO.equals(key)) { + } else if (WRITE_BUFFER_RATIO.equals(key)) { if (properties.containsKey(key)) { row.add(properties.get(key) + "%"); } else { - row.add(LOAD_BUFFER_RATIO_DEFAULT_VALUE + "%"); + row.add(WRITE_BUFFER_RATIO_DEFAULT_VALUE + "%"); } } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) { row.add("true"); @@ -609,9 +609,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (memLimitStr != null) { tWorkloadGroupInfo.setMemLimit(memLimitStr); } - String loadBufferRatioStr = properties.get(LOAD_BUFFER_RATIO); - if (loadBufferRatioStr != null) { - tWorkloadGroupInfo.setLoadBufferRatio(Integer.parseInt(loadBufferRatioStr)); + String writeBufferRatioStr = properties.get(WRITE_BUFFER_RATIO); + if (writeBufferRatioStr != null) { + tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr)); } String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT); if (memOvercommitStr != null) { diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index fa4fedcc9bb..6b41cbc14b7 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -271,7 +271,7 @@ struct TWorkloadGroupInfo { 15: optional i64 remote_read_bytes_per_second 16: optional string tag 17: optional i32 total_query_slot_count - 18: optional i32 load_buffer_ratio + 18: optional i32 write_buffer_ratio } enum TWorkloadMetricType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org