This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 5ee7e733137 Spill and reserve (#46442) 5ee7e733137 is described below commit 5ee7e73313712d444a9e24e0fe1547c10480e8de Author: yiguolei <guo...@selectdb.com> AuthorDate: Mon Jan 6 15:22:37 2025 +0800 Spill and reserve (#46442) ### What problem does this PR solve? 1. fix log4j format %% error. 2. change wg's low water mark to 75% and high watermark to 85% to make the spill disk more stable. 3. change exec_memlimit as hard limit if user set it. --- .../rowset/segment_v2/inverted_index_reader.cpp | 8 +++++--- be/src/pipeline/exec/file_scan_operator.cpp | 2 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 2 ++ be/src/runtime/memory/mem_tracker_limiter.h | 22 +++++++++++++--------- be/src/runtime/query_context.cpp | 5 +++-- be/src/runtime/query_context.h | 13 +++++-------- be/src/runtime/workload_group/workload_group.cpp | 10 +++++----- .../workload_group/workload_group_manager.cpp | 10 ++++------ .../workload_group/workload_group_manager_test.cpp | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 12 ++---------- .../resource/workloadgroup/WorkloadGroup.java | 4 ++-- .../data/workload_manager_p0/test_curd_wlg.out | 6 +++--- 12 files changed, 46 insertions(+), 50 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 4fe45283cd2..44c038aec9c 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -1158,9 +1158,11 @@ Status InvertedIndexIterator::read_from_inverted_index( RETURN_IF_ERROR( try_read_from_inverted_index(column_name, query_value, query_type, &hit_count)); if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) { - return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>( - "hit count: {}, bkd inverted reached limit {}%, segment num rows:{}", - hit_count, query_bkd_limit_percent, segment_num_rows); + return Status:: + Error<ErrorCode::INVERTED_INDEX_BYPASS>( + "hit count: {}, bkd inverted reached limit {}% , segment num " + "rows:{}", // add blackspace after % to avoid log4j format bug + hit_count, query_bkd_limit_percent, segment_num_rows); } } } diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 1571b585545..afd548f35f4 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto wg_ptr = state->get_query_ctx()->workload_group(); _max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) { + if (wg_ptr) { const auto total_slots = wg_ptr->total_query_slot_count(); const auto query_slots = state->get_query_ctx()->get_slot_count(); _max_scanners = _max_scanners * query_slots / total_slots; diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index cc4218d7653..e65dea61394 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -88,6 +88,8 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLi auto tracker = std::make_shared<MemTrackerLimiter>(type, label, byte_limit); // Write tracker is only used to tracker the size, so limit == -1 auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable" + label, -1); + // Memtable has a separate logic to deal with memory flush, so that should not check the limit in memtracker. + write_tracker->set_enable_reserve_memory(true); tracker->_write_tracker.swap(write_tracker); #ifndef BE_TEST DCHECK(ExecEnv::tracking_memory()); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 64a8b97eb32..8c67cc6197a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -139,7 +139,6 @@ public: ~MemTrackerLimiter(); Type type() const { return _type; } - void set_overcommit(bool enable) { _enable_overcommit = enable; } const std::string& label() const { return _label; } std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } @@ -217,9 +216,7 @@ public: if (UNLIKELY(bytes == 0)) { return true; } - // If enable overcommit, then the limit is useless, use a very large value as limit - bool rt = _mem_counter.try_add( - bytes, _enable_overcommit ? std::numeric_limits<int64_t>::max() : _limit.load()); + bool rt = _mem_counter.try_add(bytes, _limit.load()); if (rt && _query_statistics) { _query_statistics->set_max_peak_memory_bytes(peak_consumption()); _query_statistics->set_current_used_memory_bytes(consumption()); @@ -292,6 +289,7 @@ public: void add_address_sanitizers(void* buf, size_t size); void remove_address_sanitizers(void* buf, size_t size); bool is_group_commit_load {false}; + void set_enable_reserve_memory(bool enabled) { _enable_reserve_memory = enabled; } private: // only for Type::QUERY or Type::LOAD. @@ -318,7 +316,6 @@ private: */ Type _type; - bool _enable_overcommit = true; // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -328,6 +325,8 @@ private: MemCounter _mem_counter; MemCounter _reserved_counter; + bool _enable_reserve_memory = false; + // Limit on memory consumption, in bytes. std::atomic<int64_t> _limit; @@ -377,16 +376,21 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) { } inline Status MemTrackerLimiter::check_limit(int64_t bytes) { - /* - if (bytes <= 0 || _enable_overcommit) { + // Do not enable check limit, because reserve process will check it. + if (bytes <= 0 || _enable_reserve_memory) { return Status::OK(); } - // check limit should ignore memtable size, because it is treated as a cache + + // If reserve not enabled, then should check limit here to kill the query when limit exceed. + // For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter, + // and its reserve is set to true. So that it will not reach this logic. + // Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so + // it will not take effect. if (_limit > 0 && consumption() + bytes > _limit) { return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", MemCounter::print_bytes(bytes), tracker_limit_exceeded_str())); - }*/ + } return Status::OK(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 241cefb8942..ea950d13555 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -146,9 +146,10 @@ void QueryContext::_init_query_mem_tracker() { if (_query_options.__isset.is_report_success && _query_options.is_report_success) { query_mem_tracker->enable_print_log_usage(); } - query_mem_tracker->set_overcommit(enable_mem_overcommit()); + query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory && + _query_options.enable_reserve_memory); _user_set_mem_limit = bytes_limit; - _expected_mem_limit = _user_set_mem_limit; + _adjusted_mem_limit = bytes_limit; } QueryContext::~QueryContext() { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 96883acd8c4..a50d6041d35 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -248,12 +248,13 @@ public: int64_t get_mem_limit() const { return query_mem_tracker->limit(); } - void set_expected_mem_limit(int64_t new_mem_limit) { - _expected_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit); + // The new memlimit should be less than user set memlimit. + void set_adjusted_mem_limit(int64_t new_mem_limit) { + _adjusted_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit); } // Expected mem limit is the limit when workload group reached limit. - int64_t expected_mem_limit() { return _expected_mem_limit; } + int64_t adjusted_mem_limit() { return _adjusted_mem_limit; } std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; } @@ -261,10 +262,6 @@ public: return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1; } - bool enable_mem_overcommit() const { - return _query_options.__isset.enable_mem_overcommit ? _query_options.enable_mem_overcommit - : false; - } DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -395,7 +392,7 @@ private: std::atomic<int64_t> _paused_period_secs = 0; std::atomic<bool> _low_memory_mode = false; int64_t _user_set_mem_limit = 0; - std::atomic<int64_t> _expected_mem_limit = 0; + std::atomic<int64_t> _adjusted_mem_limit = 0; std::mutex _profile_mutex; timespec _query_arrival_timestamp; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 48d1a3c15b7..f06e416b8ce 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -93,7 +93,7 @@ std::string WorkloadGroup::debug_string() const { return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, " "total_query_slot_count = {}, " - "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}%, " + "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}% , " // add a blackspace after % to avoid log4j format bugs "enable_memory_overcommit = {}, total_mem_used = {} (write_buffer_size={})," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, cpu_hard_limit = {}, " "scan_thread_num = " @@ -133,17 +133,17 @@ std::string WorkloadGroup::memory_debug_string() const { return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}," "total_query_slot_count = {}, " - "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}%, " + "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}% , " "enable_memory_overcommit = {}, total_mem_used = {} (write_buffer_size={})," - "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, " - "memory_low_watermark={}, memory_high_watermark={}, is_shutdown={}, query_num={}]", + "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}% , " + "memory_low_watermark={}% , memory_high_watermark={}% , is_shutdown={}, query_num={}]", _id, _name, _version, _total_query_slot_count, PrettyPrinter::print(_memory_limit, TUnit::BYTES), to_string(_slot_mem_policy), _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false", PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES), PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), - mem_used_ratio, _memory_low_watermark, _memory_high_watermark, _is_shutdown, + std::trunc(mem_used_ratio), _memory_low_watermark, _memory_high_watermark, _is_shutdown, _query_ctxs.size()); } diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index fae389d6341..7eb31b28400 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -379,15 +379,15 @@ void WorkloadGroupMgr::handle_paused_queries() { // check if the reserve is too large, if it is too large, // should set the query's limit only. // Check the query's reserve with expected limit. - if (query_ctx->expected_mem_limit() < + if (query_ctx->adjusted_mem_limit() < query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { - query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); + query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit()); query_ctx->set_memory_sufficient(true); LOG(INFO) << "Workload group memory reserve failed because " << query_ctx->debug_string() << " reserve size " << PrettyPrinter::print_bytes(query_it->reserve_size_) << " is too large, set hard limit to " - << PrettyPrinter::print_bytes(query_ctx->expected_mem_limit()) + << PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit()) << " and resume running."; query_it = queries_list.erase(query_it); continue; @@ -865,10 +865,8 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha // If the query is a pure load task, then should not modify its limit. Or it will reserve // memory failed and we did not hanle it. if (!query_ctx->is_pure_load_task()) { - // If slot memory policy is enabled, then overcommit is disabled. - query_ctx->get_mem_tracker()->set_overcommit(false); query_ctx->set_mem_limit(query_weighted_mem_limit); - query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit); + query_ctx->set_adjusted_mem_limit(expected_query_weighted_mem_limit); } } LOG_EVERY_T(INFO, 60) << debug_msg; diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp b/be/test/runtime/workload_group/workload_group_manager_test.cpp index 17f4569f39b..e49d502fd25 100644 --- a/be/test/runtime/workload_group/workload_group_manager_test.cpp +++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp @@ -169,7 +169,7 @@ TEST_F(WorkloadGroupManagerTest, query_exceed) { ASSERT_EQ(query_context->is_cancelled(), true) << "query should be canceled"; } -// if (query_ctx->expected_mem_limit() < +// if (query_ctx->adjusted_mem_limit() < // query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) TEST_F(WorkloadGroupManagerTest, wg_exceed1) { auto wg = _wg_manager->get_or_create_workload_group({}); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 92a01fbf9a2..a1f5ababb1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -658,8 +658,6 @@ public class SessionVariable implements Serializable, Writable { public static final String QUERY_SLOT_COUNT = "query_slot_count"; - public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit"; - public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num"; public static final String USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS = "use_max_length_of_varchar_in_ctas"; @@ -757,9 +755,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true) public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS; - // max memory used on every backend. + // max memory used on every backend. Default value to 100G. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true) - public long maxExecMemByte = 2147483648L; + public long maxExecMemByte = 100147483648L; @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block", @@ -866,11 +864,6 @@ public class SessionVariable implements Serializable, Writable { } } - @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true, description = { - "是否通过硬限的方式来计算每个Query的内存资源", - "Whether to calculate the memory resources of each query by hard limit"}) - public boolean enableMemOvercommit = true; - @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM) public int maxColumnReaderNum = 20000; @@ -4055,7 +4048,6 @@ public class SessionVariable implements Serializable, Writable { tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames); tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames); tResult.setQuerySlotCount(wgQuerySlotCount); - tResult.setEnableMemOvercommit(enableMemOvercommit); tResult.setKeepCarriageReturn(keepCarriageReturn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 7852d3b6302..5fe16f6c364 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -134,8 +134,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(SCAN_THREAD_NUM, "-1"); ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_REMOTE_SCAN_THREAD_NUM, "-1"); ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MIN_REMOTE_SCAN_THREAD_NUM, "-1"); - ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "80%"); - ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "95%"); + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "75%"); + ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "85%"); ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(TAG, ""); ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(READ_BYTES_PER_SECOND, "-1"); ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND, "-1"); diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 0e05adff526..68c7f735096 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -144,11 +144,11 @@ test_wg_priv_role1 test_wg_priv_g1 Usage_priv NO -- !select_wgp_12 -- -- !select_default_val_wg_1 -- -default_val_wg -1 -1 true 2147483647 0 0 -1 -1 -1 -1 80% 95% -1 -1 +default_val_wg -1 -1 true 2147483647 0 0 -1 -1 -1 -1 75% 85% -1 -1 -- !select_default_val_wg_2 -- -default_val_wg 1024 1% true 100 1 123 1% 1 12 10 80% 95% abc 123 10 +default_val_wg 1024 1% true 100 1 123 1% 1 12 10 75% 85% abc 123 10 -- !select_default_val_wg_3 -- -default_val_wg -1 -1 true 2147483647 0 0 -1 -1 -1 -1 80% 95% -1 -1 +default_val_wg -1 -1 true 2147483647 0 0 -1 -1 -1 -1 75% 85% -1 -1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org