This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 5ee7e733137 Spill and reserve (#46442)
5ee7e733137 is described below

commit 5ee7e73313712d444a9e24e0fe1547c10480e8de
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Mon Jan 6 15:22:37 2025 +0800

    Spill and reserve (#46442)
    
    ### What problem does this PR solve?
    
    1. fix log4j format %% error.
    2. change wg's low water mark to 75% and high watermark to 85% to make
    the spill disk more stable.
    3. change exec_memlimit as hard limit if user set it.
---
 .../rowset/segment_v2/inverted_index_reader.cpp    |  8 +++++---
 be/src/pipeline/exec/file_scan_operator.cpp        |  2 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      |  2 ++
 be/src/runtime/memory/mem_tracker_limiter.h        | 22 +++++++++++++---------
 be/src/runtime/query_context.cpp                   |  5 +++--
 be/src/runtime/query_context.h                     | 13 +++++--------
 be/src/runtime/workload_group/workload_group.cpp   | 10 +++++-----
 .../workload_group/workload_group_manager.cpp      | 10 ++++------
 .../workload_group/workload_group_manager_test.cpp |  2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 12 ++----------
 .../resource/workloadgroup/WorkloadGroup.java      |  4 ++--
 .../data/workload_manager_p0/test_curd_wlg.out     |  6 +++---
 12 files changed, 46 insertions(+), 50 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 4fe45283cd2..44c038aec9c 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -1158,9 +1158,11 @@ Status InvertedIndexIterator::read_from_inverted_index(
             RETURN_IF_ERROR(
                     try_read_from_inverted_index(column_name, query_value, 
query_type, &hit_count));
             if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) {
-                return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
-                        "hit count: {}, bkd inverted reached limit {}%, 
segment num rows:{}",
-                        hit_count, query_bkd_limit_percent, segment_num_rows);
+                return Status::
+                        Error<ErrorCode::INVERTED_INDEX_BYPASS>(
+                                "hit count: {}, bkd inverted reached limit {}% 
, segment num "
+                                "rows:{}", // add blackspace after % to avoid 
log4j format bug
+                                hit_count, query_bkd_limit_percent, 
segment_num_rows);
             }
         }
     }
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 1571b585545..afd548f35f4 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
     auto wg_ptr = state->get_query_ctx()->workload_group();
     _max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) {
+    if (wg_ptr) {
         const auto total_slots = wg_ptr->total_query_slot_count();
         const auto query_slots = state->get_query_ctx()->get_slot_count();
         _max_scanners = _max_scanners * query_slots / total_slots;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index cc4218d7653..e65dea61394 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -88,6 +88,8 @@ std::shared_ptr<MemTrackerLimiter> 
MemTrackerLimiter::create_shared(MemTrackerLi
     auto tracker = std::make_shared<MemTrackerLimiter>(type, label, 
byte_limit);
     // Write tracker is only used to tracker the size, so limit == -1
     auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable" 
+ label, -1);
+    // Memtable has a separate logic to deal with memory flush, so that should 
not check the limit in memtracker.
+    write_tracker->set_enable_reserve_memory(true);
     tracker->_write_tracker.swap(write_tracker);
 #ifndef BE_TEST
     DCHECK(ExecEnv::tracking_memory());
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 64a8b97eb32..8c67cc6197a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -139,7 +139,6 @@ public:
     ~MemTrackerLimiter();
 
     Type type() const { return _type; }
-    void set_overcommit(bool enable) { _enable_overcommit = enable; }
     const std::string& label() const { return _label; }
     std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
     int64_t group_num() const { return _group_num; }
@@ -217,9 +216,7 @@ public:
         if (UNLIKELY(bytes == 0)) {
             return true;
         }
-        // If enable overcommit, then the limit is useless, use a very large 
value as limit
-        bool rt = _mem_counter.try_add(
-                bytes, _enable_overcommit ? 
std::numeric_limits<int64_t>::max() : _limit.load());
+        bool rt = _mem_counter.try_add(bytes, _limit.load());
         if (rt && _query_statistics) {
             _query_statistics->set_max_peak_memory_bytes(peak_consumption());
             _query_statistics->set_current_used_memory_bytes(consumption());
@@ -292,6 +289,7 @@ public:
     void add_address_sanitizers(void* buf, size_t size);
     void remove_address_sanitizers(void* buf, size_t size);
     bool is_group_commit_load {false};
+    void set_enable_reserve_memory(bool enabled) { _enable_reserve_memory = 
enabled; }
 
 private:
     // only for Type::QUERY or Type::LOAD.
@@ -318,7 +316,6 @@ private:
     */
 
     Type _type;
-    bool _enable_overcommit = true;
 
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
@@ -328,6 +325,8 @@ private:
     MemCounter _mem_counter;
     MemCounter _reserved_counter;
 
+    bool _enable_reserve_memory = false;
+
     // Limit on memory consumption, in bytes.
     std::atomic<int64_t> _limit;
 
@@ -377,16 +376,21 @@ inline void MemTrackerLimiter::cache_consume(int64_t 
bytes) {
 }
 
 inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
-    /*
-    if (bytes <= 0 || _enable_overcommit) {
+    // Do not enable check limit, because reserve process will check it.
+    if (bytes <= 0 || _enable_reserve_memory) {
         return Status::OK();
     }
-    // check limit should ignore memtable size, because it is treated as a 
cache
+
+    // If reserve not enabled, then should check limit here to kill the query 
when limit exceed.
+    // For insert into select or pure load job, its memtable is accounted in a 
seperate memtracker limiter,
+    // and its reserve is set to true. So that it will not reach this logic.
+    // Only query and load job has exec_mem_limit and the _limit > 0, other 
memtracker limiter's _limit is -1 so
+    // it will not take effect.
     if (_limit > 0 && consumption() + bytes > _limit) {
         return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, 
{}",
                                                        
MemCounter::print_bytes(bytes),
                                                        
tracker_limit_exceeded_str()));
-    }*/
+    }
     return Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 241cefb8942..ea950d13555 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -146,9 +146,10 @@ void QueryContext::_init_query_mem_tracker() {
     if (_query_options.__isset.is_report_success && 
_query_options.is_report_success) {
         query_mem_tracker->enable_print_log_usage();
     }
-    query_mem_tracker->set_overcommit(enable_mem_overcommit());
+    
query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory
 &&
+                                                 
_query_options.enable_reserve_memory);
     _user_set_mem_limit = bytes_limit;
-    _expected_mem_limit = _user_set_mem_limit;
+    _adjusted_mem_limit = bytes_limit;
 }
 
 QueryContext::~QueryContext() {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 96883acd8c4..a50d6041d35 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -248,12 +248,13 @@ public:
 
     int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
 
-    void set_expected_mem_limit(int64_t new_mem_limit) {
-        _expected_mem_limit = std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit);
+    // The new memlimit should be less than user set memlimit.
+    void set_adjusted_mem_limit(int64_t new_mem_limit) {
+        _adjusted_mem_limit = std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit);
     }
 
     // Expected mem limit is the limit when workload group reached limit.
-    int64_t expected_mem_limit() { return _expected_mem_limit; }
+    int64_t adjusted_mem_limit() { return _adjusted_mem_limit; }
 
     std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return 
query_mem_tracker; }
 
@@ -261,10 +262,6 @@ public:
         return _query_options.__isset.query_slot_count ? 
_query_options.query_slot_count : 1;
     }
 
-    bool enable_mem_overcommit() const {
-        return _query_options.__isset.enable_mem_overcommit ? 
_query_options.enable_mem_overcommit
-                                                            : false;
-    }
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
     std::string user;
@@ -395,7 +392,7 @@ private:
     std::atomic<int64_t> _paused_period_secs = 0;
     std::atomic<bool> _low_memory_mode = false;
     int64_t _user_set_mem_limit = 0;
-    std::atomic<int64_t> _expected_mem_limit = 0;
+    std::atomic<int64_t> _adjusted_mem_limit = 0;
 
     std::mutex _profile_mutex;
     timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 48d1a3c15b7..f06e416b8ce 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -93,7 +93,7 @@ std::string WorkloadGroup::debug_string() const {
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
             "total_query_slot_count = {}, "
-            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}% , " // add a blackspace after % to avoid log4j format bugs
             "enable_memory_overcommit = {}, total_mem_used = {} 
(write_buffer_size={}),"
             "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
cpu_hard_limit = {}, "
             "scan_thread_num = "
@@ -133,17 +133,17 @@ std::string WorkloadGroup::memory_debug_string() const {
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, version = {},"
             "total_query_slot_count = {}, "
-            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}% , "
             "enable_memory_overcommit = {}, total_mem_used = {} 
(write_buffer_size={}),"
-            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, "
-            "memory_low_watermark={}, memory_high_watermark={}, 
is_shutdown={}, query_num={}]",
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}% , "
+            "memory_low_watermark={}% , memory_high_watermark={}% , 
is_shutdown={}, query_num={}]",
             _id, _name, _version, _total_query_slot_count,
             PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
             _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
             PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
-            mem_used_ratio, _memory_low_watermark, _memory_high_watermark, 
_is_shutdown,
+            std::trunc(mem_used_ratio), _memory_low_watermark, 
_memory_high_watermark, _is_shutdown,
             _query_ctxs.size());
 }
 
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index fae389d6341..7eb31b28400 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -379,15 +379,15 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 // check if the reserve is too large, if it is too large,
                 // should set the query's limit only.
                 // Check the query's reserve with expected limit.
-                if (query_ctx->expected_mem_limit() <
+                if (query_ctx->adjusted_mem_limit() <
                     query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
-                    query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
                     query_ctx->set_memory_sufficient(true);
                     LOG(INFO) << "Workload group memory reserve failed because 
"
                               << query_ctx->debug_string() << " reserve size "
                               << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
                               << " is too large, set hard limit to "
-                              << 
PrettyPrinter::print_bytes(query_ctx->expected_mem_limit())
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
                               << " and resume running.";
                     query_it = queries_list.erase(query_it);
                     continue;
@@ -865,10 +865,8 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         // If the query is a pure load task, then should not modify its limit. 
Or it will reserve
         // memory failed and we did not hanle it.
         if (!query_ctx->is_pure_load_task()) {
-            // If slot memory policy is enabled, then overcommit is disabled.
-            query_ctx->get_mem_tracker()->set_overcommit(false);
             query_ctx->set_mem_limit(query_weighted_mem_limit);
-            
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
+            
query_ctx->set_adjusted_mem_limit(expected_query_weighted_mem_limit);
         }
     }
     LOG_EVERY_T(INFO, 60) << debug_msg;
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp 
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index 17f4569f39b..e49d502fd25 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -169,7 +169,7 @@ TEST_F(WorkloadGroupManagerTest, query_exceed) {
     ASSERT_EQ(query_context->is_cancelled(), true) << "query should be 
canceled";
 }
 
-// if (query_ctx->expected_mem_limit() <
+// if (query_ctx->adjusted_mem_limit() <
 //                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_)
 TEST_F(WorkloadGroupManagerTest, wg_exceed1) {
     auto wg = _wg_manager->get_or_create_workload_group({});
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 92a01fbf9a2..a1f5ababb1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -658,8 +658,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String QUERY_SLOT_COUNT = "query_slot_count";
 
-    public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit";
-
     public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
 
     public static final String USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS = 
"use_max_length_of_varchar_in_ctas";
@@ -757,9 +755,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
     public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
 
-    // max memory used on every backend.
+    // max memory used on every backend. Default value to 100G.
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
-    public long maxExecMemByte = 2147483648L;
+    public long maxExecMemByte = 100147483648L;
 
     @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
             description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
@@ -866,11 +864,6 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
-    @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true, 
description = {
-            "是否通过硬限的方式来计算每个Query的内存资源",
-            "Whether to calculate the memory resources of each query by hard 
limit"})
-    public boolean enableMemOvercommit = true;
-
     @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
     public int maxColumnReaderNum = 20000;
 
@@ -4055,7 +4048,6 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
         tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
         tResult.setQuerySlotCount(wgQuerySlotCount);
-        tResult.setEnableMemOvercommit(enableMemOvercommit);
 
         tResult.setKeepCarriageReturn(keepCarriageReturn);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 7852d3b6302..5fe16f6c364 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -134,8 +134,8 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(SCAN_THREAD_NUM, "-1");
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_REMOTE_SCAN_THREAD_NUM, "-1");
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MIN_REMOTE_SCAN_THREAD_NUM, "-1");
-        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "80%");
-        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "95%");
+        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "75%");
+        ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "85%");
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(TAG, "");
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(READ_BYTES_PER_SECOND, "-1");
         ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND, 
"-1");
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out 
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index 0e05adff526..68c7f735096 100644
--- a/regression-test/data/workload_manager_p0/test_curd_wlg.out
+++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out
@@ -144,11 +144,11 @@ test_wg_priv_role1        test_wg_priv_g1 Usage_priv      
NO
 -- !select_wgp_12 --
 
 -- !select_default_val_wg_1 --
-default_val_wg -1      -1      true    2147483647      0       0       -1      
-1      -1      -1      80%     95%             -1      -1
+default_val_wg -1      -1      true    2147483647      0       0       -1      
-1      -1      -1      75%     85%             -1      -1
 
 -- !select_default_val_wg_2 --
-default_val_wg 1024    1%      true    100     1       123     1%      1       
12      10      80%     95%     abc     123     10
+default_val_wg 1024    1%      true    100     1       123     1%      1       
12      10      75%     85%     abc     123     10
 
 -- !select_default_val_wg_3 --
-default_val_wg -1      -1      true    2147483647      0       0       -1      
-1      -1      -1      80%     95%             -1      -1
+default_val_wg -1      -1      true    2147483647      0       0       -1      
-1      -1      -1      75%     85%             -1      -1
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to