This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4204f49ce4e3acec2383b438cbf4c63cc87e93c1 Author: yiguolei <676222...@qq.com> AuthorDate: Fri Oct 25 18:10:47 2024 +0800 block memtable when memory is not enough (#42418) Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/olap/memtable.cpp | 1 + be/src/olap/memtable_memory_limiter.cpp | 56 ++- be/src/olap/memtable_memory_limiter.h | 9 +- be/src/pipeline/exec/file_scan_operator.cpp | 2 +- be/src/runtime/load_channel.h | 2 + be/src/runtime/load_channel_mgr.cpp | 3 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 15 + be/src/runtime/memory/mem_tracker_limiter.h | 26 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 18 +- be/src/runtime/query_context.cpp | 1 + be/src/runtime/query_context.h | 7 +- be/src/runtime/runtime_state.cpp | 4 + be/src/runtime/runtime_state.h | 3 + be/src/runtime/workload_group/workload_group.cpp | 145 ++++++-- be/src/runtime/workload_group/workload_group.h | 49 +-- .../workload_group/workload_group_manager.cpp | 406 +++++++++++---------- .../workload_group/workload_group_manager.h | 20 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 3 +- .../java/org/apache/doris/qe/SessionVariable.java | 14 +- .../resource/workloadgroup/WorkloadGroup.java | 51 ++- gensrc/thrift/BackendService.thrift | 1 + gensrc/thrift/PaloInternalService.thrift | 2 +- 22 files changed, 523 insertions(+), 315 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index e0f19b1624d..facbc90c450 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -78,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _mem_tracker = std::make_shared<MemTracker>(); + _query_thread_context.query_mem_tracker->push_load_buffer(_mem_tracker); } void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 7a220ef87ac..2071f814768 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -111,6 +111,21 @@ int64_t MemTableMemoryLimiter::_need_flush() { return need_flush - _queue_mem_usage; } +void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPtr wg) { + // It means some query is pending on here to flush memtable and to continue running. + // So that should wait here. + // Wait at most 1s, because this code is not aware cancel flag. If the load task is cancelled + // Should releae memory quickly. + using namespace std::chrono_literals; + int32_t sleep_times = 10; + while (wg != nullptr && wg->enable_load_buffer_limit() && sleep_times > 0) { + std::this_thread::sleep_for(100ms); + --sleep_times; + } + // Check process memory again. + handle_memtable_flush(); +} + void MemTableMemoryLimiter::handle_memtable_flush() { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); @@ -150,20 +165,39 @@ void MemTableMemoryLimiter::handle_memtable_flush() { LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"; } -void MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, - int64_t need_flush_bytes) { +int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush) { std::unique_lock<std::mutex> l(_lock); - _flush_active_memtables(wg_id, need_flush_bytes); + return _flush_active_memtables(wg_id, need_flush); +} + +void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, + int64_t* queue_bytes, + int64_t* flush_bytes) { + std::unique_lock<std::mutex> l(_lock); + *active_bytes = 0; + *queue_bytes = 0; + *flush_bytes = 0; + for (auto it = _writers.begin(); it != _writers.end(); ++it) { + if (auto writer = it->lock()) { + // If wg id is specified, but wg id not match, then not need flush + if (writer->workload_group_id() != wg_id) { + continue; + } + *active_bytes += writer->active_memtable_mem_consumption(); + *queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED); + *flush_bytes += writer->mem_consumption(MemType::FLUSH); + } + } } -void MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need_flush) { +int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need_flush) { if (need_flush <= 0) { - return; + return 0; } _refresh_mem_tracker(); if (_active_writers.size() == 0) { - return; + return 0; } using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>; @@ -211,6 +245,7 @@ void MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need } LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size() << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed); + return mem_flushed; } void MemTableMemoryLimiter::refresh_mem_tracker() { @@ -245,29 +280,20 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _flush_mem_usage = 0; _queue_mem_usage = 0; _active_mem_usage = 0; - std::map<uint64_t, doris::MemtableUsage> wg_mem_usages; _active_writers.clear(); for (auto it = _writers.begin(); it != _writers.end();) { if (auto writer = it->lock()) { - if (wg_mem_usages.find(writer->workload_group_id()) == wg_mem_usages.end()) { - wg_mem_usages.insert({writer->workload_group_id(), {0, 0, 0}}); - } - auto& wg_mem_usage = wg_mem_usages.find(writer->workload_group_id())->second; - // The memtable is currently used by writer to insert blocks. auto active_usage = writer->active_memtable_mem_consumption(); - wg_mem_usage.active_mem_usage += active_usage; _active_mem_usage += active_usage; if (active_usage > 0) { _active_writers.push_back(writer); } auto flush_usage = writer->mem_consumption(MemType::FLUSH); - wg_mem_usage.flush_mem_usage += flush_usage; _flush_mem_usage += flush_usage; auto write_usage = writer->mem_consumption(MemType::WRITE_FINISHED); - wg_mem_usage.queue_mem_usage += write_usage; _queue_mem_usage += write_usage; ++it; } else { diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 143edaa8fe5..de2fb802165 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/workload_group/workload_group.h" #include "util/countdown_latch.h" #include "util/stopwatch.hpp" @@ -37,13 +38,17 @@ public: Status init(int64_t process_mem_limit); + void handle_workload_group_memtable_flush(WorkloadGroupPtr wg); // check if the total mem consumption exceeds limit. // If yes, it will flush memtable to try to reduce memory consumption. // Every write operation will call this API to check if need flush memtable OR hang // when memory is not available. void handle_memtable_flush(); - void flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes); + int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes); + + void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, + int64_t* queue_bytes, int64_t* flush_bytes); void register_writer(std::weak_ptr<MemTableWriter> writer); @@ -61,7 +66,7 @@ private: bool _hard_limit_reached(); bool _load_usage_low(); int64_t _need_flush(); - void _flush_active_memtables(uint64_t wg_id, int64_t need_flush); + int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush); void _refresh_mem_tracker(); std::mutex _lock; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 1c8db8b446a..1571b585545 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto wg_ptr = state->get_query_ctx()->workload_group(); _max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) { + if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) { const auto total_slots = wg_ptr->total_query_slot_count(); const auto query_slots = state->get_query_ctx()->get_slot_count(); _max_scanners = _max_scanners * query_slots / total_slots; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 36a8f363ba9..c9ed66f8c74 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -69,6 +69,8 @@ public: bool is_high_priority() const { return _is_high_priority; } + WorkloadGroupPtr workload_group() const { return _query_thread_context.wg_wptr.lock(); } + RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; } RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index c53cade466b..55db6564488 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -150,7 +150,8 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, // If this is a high priority load task, do not handle this. // because this may block for a while, which may lead to rpc timeout. SCOPED_TIMER(channel->get_handle_mem_limit_timer()); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush( + channel->workload_group()); } // 3. add batch to load channel diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 5c79e0d37a6..487b7d48fae 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -257,6 +257,21 @@ void MemTrackerLimiter::clean_tracker_limiter_group() { #endif } +void MemTrackerLimiter::update_load_buffer_size() { + std::lock_guard l(_load_buffer_lock); + int64_t total_buf_size = 0; + for (auto memtable_tracker = _load_buffers.begin(); memtable_tracker != _load_buffers.end();) { + auto m = memtable_tracker->lock(); + if (m == nullptr) { + memtable_tracker = _load_buffers.erase(memtable_tracker); + } else { + total_buf_size += m->consumption(); + ++memtable_tracker; + } + } + _load_buffer_size = total_buf_size; +} + void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type) { if (type == Type::GLOBAL) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index bca16290c17..6f0ecdbe975 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -35,6 +35,7 @@ #include "common/config.h" #include "common/status.h" #include "runtime/memory/mem_counter.h" +#include "runtime/memory/mem_tracker.h" #include "runtime/query_statistics.h" #include "util/string_util.h" #include "util/uid_util.h" @@ -45,6 +46,7 @@ class RuntimeProfile; class MemTrackerLimiter; constexpr size_t MEM_TRACKER_GROUP_NUM = 1000; +constexpr size_t QUERY_MIN_MEMORY = 32 * 1024 * 1024; struct TrackerLimiterGroup { // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize, @@ -132,6 +134,7 @@ public: ~MemTrackerLimiter(); Type type() const { return _type; } + void set_overcommit(bool enable) { _enable_overcommit = enable; } const std::string& label() const { return _label; } std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; } int64_t group_num() const { return _group_num; } @@ -141,7 +144,6 @@ public: Status check_limit(int64_t bytes = 0); // Log the memory usage when memory limit is exceeded. std::string tracker_limit_exceeded_str(); - bool is_overcommit_tracker() const { return type() == Type::QUERY || type() == Type::LOAD; } void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; } bool is_query_cancelled() { return _is_query_cancelled; } void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } @@ -208,7 +210,8 @@ public: if (UNLIKELY(bytes == 0)) { return true; } - bool rt = _mem_counter.try_add(bytes, _limit); + // Reserve will check limit, should ignore load buffer size. + bool rt = _mem_counter.try_add(bytes - _load_buffer_size, _limit); if (rt && _query_statistics) { _query_statistics->set_max_peak_memory_bytes(peak_consumption()); _query_statistics->set_current_used_memory_bytes(consumption()); @@ -234,6 +237,15 @@ public: static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num); static void make_all_tasks_tracker_profile(RuntimeProfile* profile); + void push_load_buffer(std::shared_ptr<MemTracker> memtable_tracker) { + std::lock_guard l(_load_buffer_lock); + _load_buffers.push_back(memtable_tracker); + } + + void update_load_buffer_size(); + + int64_t load_buffer_size() const { return _load_buffer_size; } + void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } @@ -300,6 +312,7 @@ private: */ Type _type; + bool _enable_overcommit = true; // label used in the make snapshot, not guaranteed unique. std::string _label; @@ -327,6 +340,10 @@ private: std::shared_ptr<QueryStatistics> _query_statistics = nullptr; + std::mutex _load_buffer_lock; + std::vector<std::weak_ptr<MemTracker>> _load_buffers; + std::atomic<int64_t> _load_buffer_size = 0; + struct AddressSanitizer { size_t size; std::string stack_trace; @@ -356,10 +373,11 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) { } inline Status MemTrackerLimiter::check_limit(int64_t bytes) { - if (bytes <= 0 || (is_overcommit_tracker() && config::enable_query_memory_overcommit)) { + if (bytes <= 0 || _enable_overcommit) { return Status::OK(); } - if (_limit > 0 && consumption() + bytes > _limit) { + // check limit should ignore memtable size, because it is treated as a cache + if (_limit > 0 && consumption() - _load_buffer_size + bytes > _limit) { return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", MemCounter::print_bytes(bytes), tracker_limit_exceeded_str())); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 9a316032bc9..83caf753aed 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -286,21 +286,6 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); auto wg_ptr = _wg_wptr.lock(); - // For wg with overcommit, the limit will only task affect when memory > soft limit - // wg mgr will change wg's hard limit property. - if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() && - !wg_ptr->has_changed_to_hard_limit()) { - // Only do a check here, do not real reserve. If we could reserve it, it is better, but the logic is too complicated. - if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { - return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>( - "reserve memory failed, size: {}, because {}", - PrettyPrinter::print(size, TUnit::BYTES), - GlobalMemoryArbitrator::process_mem_log_str()); - } else { - doris::GlobalMemoryArbitrator::release_process_reserved_memory(size); - return Status::OK(); - } - } if (!_limiter_tracker->try_reserve(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because query memory exceeded, memory tracker " @@ -313,7 +298,8 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { if (wg_ptr) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { auto err_msg = fmt::format( - "reserve memory failed, size: {}, because wg memory exceeded, wg info: {}", + "reserve memory failed, size: {}, because workload group memory exceeded, " + "workload group: {}", PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string()); _limiter_tracker->release(size); // rollback _limiter_tracker->release_reserved(size); // rollback diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index b097ba91ee4..84b4cbc182d 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -146,6 +146,7 @@ void QueryContext::_init_query_mem_tracker() { if (_query_options.__isset.is_report_success && _query_options.is_report_success) { query_mem_tracker->enable_print_log_usage(); } + query_mem_tracker->set_overcommit(enable_mem_overcommit()); _user_set_mem_limit = bytes_limit; } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 746df124b07..04c32535100 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -253,10 +253,9 @@ public: return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1; } - bool enable_query_slot_hard_limit() const { - return _query_options.__isset.enable_query_slot_hard_limit - ? _query_options.enable_query_slot_hard_limit - : false; + bool enable_mem_overcommit() const { + return _query_options.__isset.enable_mem_overcommit ? _query_options.enable_mem_overcommit + : false; } DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index e3f9d075c8f..4277173f1b7 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -304,6 +304,10 @@ std::shared_ptr<MemTrackerLimiter> RuntimeState::query_mem_tracker() const { return _query_mem_tracker; } +WorkloadGroupPtr RuntimeState::workload_group() { + return _query_ctx->workload_group(); +} + bool RuntimeState::log_error(const std::string& error) { std::lock_guard<std::mutex> l(_error_log_lock); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index bdc9a102fd6..31e1d378526 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -45,6 +45,7 @@ #include "io/fs/file_system.h" #include "io/fs/s3_file_system.h" #include "runtime/task_execution_context.h" +#include "runtime/workload_group/workload_group.h" #include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/columns/columns_number.h" @@ -450,6 +451,8 @@ public: QueryContext* get_query_ctx() { return _query_ctx; } + WorkloadGroupPtr workload_group(); + void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) { _query_mem_tracker = tracker; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index ba6681eb41a..2cc232e97df 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -51,12 +51,14 @@ const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; // This is a invalid value, and should ignore this value during usage const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0; +const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20; WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), _memory_limit(tg_info.memory_limit), + _load_buffer_ratio(tg_info.load_buffer_ratio), _enable_memory_overcommit(tg_info.enable_memory_overcommit), _cpu_share(tg_info.cpu_share), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), @@ -83,29 +85,25 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes"); _total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>( _name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1); - _load_buffer_limit = (int64_t)(_memory_limit * 0.2); - // Its initial value should equal to memory limit, or it will be 0 and all reserve memory request will failed. - _weighted_memory_limit = _memory_limit; } std::string WorkloadGroup::debug_string() const { std::shared_lock<std::shared_mutex> rl {_mutex}; auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); - auto mem_used_ratio = realtime_total_mem_used / ((double)_weighted_memory_limit + 1); + auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 1); return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, " "total_query_slot_count={}, " - "memory_limit = {}, " - "enable_memory_overcommit = {}, weighted_memory_limit = {}, total_mem_used = {}," + "memory_limit = {}, load_buffer_ratio= {}%" + "enable_memory_overcommit = {}, total_mem_used = {}," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " "{}, spill_high_watermark = {},cpu_hard_limit = {}, scan_thread_num = " "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, " "is_shutdown={}, query_num={}, " "read_bytes_per_second={}, remote_read_bytes_per_second={}]", _id, _name, _version, cpu_share(), _total_query_slot_count, - PrettyPrinter::print(_memory_limit, TUnit::BYTES), + PrettyPrinter::print(_memory_limit, TUnit::BYTES), _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false", - PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES), PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), mem_used_ratio, _spill_low_watermark, _spill_high_watermark, cpu_hard_limit(), @@ -115,21 +113,18 @@ std::string WorkloadGroup::debug_string() const { } bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) { - // If a group is enable memory overcommit, then not need check the limit - // It is always true, and it will only fail when process memory is not - // enough. - if (_enable_memory_overcommit) { - if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) { - return false; - } else { - return true; - } - } auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() + size; if ((realtime_total_mem_used > ((double)_memory_limit * _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { - return false; + // If a group is enable memory overcommit, then not need check the limit + // It is always true, and it will only fail when process memory is not + // enough. + if (_enable_memory_overcommit) { + return true; + } else { + return false; + } } else { _wg_refresh_interval_memory_growth.fetch_add(size); return true; @@ -138,16 +133,15 @@ bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) { std::string WorkloadGroup::memory_debug_string() const { auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); - auto mem_used_ratio = realtime_total_mem_used / ((double)_weighted_memory_limit + 1); + auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 1); return fmt::format( "WorkloadGroup[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = " - "{}, weighted_memory_limit = {}, total_mem_used = {}," + "{}, total_mem_used = {}," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " "{}, " "spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]", _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", - PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES), PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), mem_used_ratio, _spill_low_watermark, _spill_high_watermark, _version, _is_shutdown, @@ -181,6 +175,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { _scan_bytes_per_second = tg_info.read_bytes_per_second; _remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second; _total_query_slot_count = tg_info.total_query_slot_count; + _load_buffer_ratio = tg_info.load_buffer_ratio; } else { return; } @@ -188,9 +183,9 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { } // MemtrackerLimiter is not removed during query context release, so that should remove it here. -int64_t WorkloadGroup::make_memory_tracker_snapshots( - std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) { +int64_t WorkloadGroup::refresh_memory_usage() { int64_t used_memory = 0; + int64_t load_buffer_size = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); for (auto trackerWptr = mem_tracker_group.trackers.begin(); @@ -199,16 +194,16 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( if (tracker == nullptr) { trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); } else { - if (tracker_snapshots != nullptr) { - tracker_snapshots->insert(tracker_snapshots->end(), tracker); - } + tracker->update_load_buffer_size(); used_memory += tracker->consumption(); + load_buffer_size += tracker->load_buffer_size(); ++trackerWptr; } } } // refresh total memory used. _total_mem_used = used_memory; + _load_buffer_size = load_buffer_size; // reserve memory is recorded in the query mem tracker // and _total_mem_used already contains all the current reserve memory. // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. @@ -218,7 +213,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( } int64_t WorkloadGroup::memory_used() { - return make_memory_tracker_snapshots(nullptr); + return refresh_memory_usage(); } void WorkloadGroup::do_sweep() { @@ -255,6 +250,91 @@ void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> m _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); } +int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile) { + if (need_free_mem <= 0) { + return 0; + } + int64_t used_memory = memory_used(); + // Change need free mem to exceed limit + need_free_mem = std::min<int64_t>(used_memory - _memory_limit, need_free_mem); + if (need_free_mem <= 0) { + return 0; + } + + int64_t freed_mem = 0; + + std::string cancel_str = + fmt::format("Kill overcommit query, wg id:{}, name:{}, used:{}, limit:{}, backend:{}.", + _id, _name, MemCounter::print_bytes(used_memory), + MemCounter::print_bytes(_memory_limit), BackendOptions::get_localhost()); + + auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, " + "Execute again after enough memory, details see be.INFO.", + cancel_str, label, MemCounter::print_bytes(mem_consumption), + GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); + }; + + LOG(INFO) << fmt::format( + "Workload group start gc, id:{} name:{}, memory limit: {}, used: {}, " + "need_free_mem: {}.", + _id, _name, _memory_limit, used_memory, need_free_mem); + Defer defer {[&]() { + LOG(INFO) << fmt::format( + "Workload group finished gc, id:{} name:{}, memory limit: {}, used: " + "{}, need_free_mem: {}, freed memory: {}.", + _id, _name, _memory_limit, used_memory, need_free_mem, freed_mem); + }}; + + // 1. free top overcommit query + RuntimeProfile* tmq_profile = profile->create_child( + fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, _mem_tracker_limiter_pool, + cancel_top_overcommit_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + // To be compatible with the non-group's gc logic, minorGC just gc overcommit query + if (freed_mem >= need_free_mem) { + return freed_mem; + } + auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { + return fmt::format( + "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute " + "again " + "after enough memory, details see be.INFO.", + cancel_str, label, MemCounter::print_bytes(mem_consumption), + GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); + }; + // 2. free top usage query + tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, _mem_tracker_limiter_pool, + cancel_top_usage_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + + // 3. free top overcommit load + tmq_profile = profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", _name), + true, true); + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, _mem_tracker_limiter_pool, + cancel_top_overcommit_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + + // 4. free top usage load + tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, _mem_tracker_limiter_pool, + cancel_top_usage_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + return freed_mem; +} + int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { if (need_free_mem <= 0) { return 0; @@ -466,6 +546,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( total_query_slot_count = tworkload_group_info.total_query_slot_count; } + // 17 load buffer memory limit + int load_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE; + if (tworkload_group_info.__isset.load_buffer_ratio) { + load_buffer_ratio = tworkload_group_info.load_buffer_ratio; + } + return {.id = tg_id, .name = name, .cpu_share = cpu_share, @@ -481,7 +567,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( .spill_high_watermark = spill_high_watermark, .read_bytes_per_second = read_bytes_per_second, .remote_read_bytes_per_second = remote_read_bytes_per_second, - .total_query_slot_count = total_query_slot_count}; + .total_query_slot_count = total_query_slot_count, + .load_buffer_ratio = load_buffer_ratio}; } void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index dc857a9f40d..c88f1e3604b 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -80,14 +80,14 @@ public: int64_t total_mem_used() const { return _total_mem_used; } - void set_weighted_memory_limit(int64_t weighted_memory_limit) { - _weighted_memory_limit = weighted_memory_limit; - } + int64_t load_mem_used() const { return _load_buffer_size; } + + void enable_load_buffer_limit(bool enable_limit) { _enable_load_buffer_limit = enable_limit; } + + bool enable_load_buffer_limit() const { return _enable_load_buffer_limit; } // make memory snapshots and refresh total memory used at the same time. - int64_t make_memory_tracker_snapshots( - std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots); - // call make_memory_tracker_snapshots, so also refresh total memory used. + int64_t refresh_memory_usage(); int64_t memory_used(); void do_sweep(); @@ -120,20 +120,6 @@ public: _spill_high_watermark.load(std::memory_order_relaxed) / 100)); } - void update_load_mem_usage(int64_t active_bytes, int64_t queue_bytes, int64_t flush_bytes) { - std::unique_lock<std::shared_mutex> wlock(_mutex); - _active_mem_usage = active_bytes; - _queue_mem_usage = queue_bytes; - _flush_mem_usage = flush_bytes; - } - - void get_load_mem_usage(int64_t* active_bytes, int64_t* queue_bytes, int64_t* flush_bytes) { - std::shared_lock<std::shared_mutex> r_lock(_mutex); - *active_bytes += _active_mem_usage; - *queue_bytes += _queue_mem_usage; - *flush_bytes += _flush_mem_usage; - } - std::string debug_string() const; std::string memory_debug_string() const; @@ -222,11 +208,9 @@ public: return _memtable_flush_pool.get(); } - int64_t load_buffer_limit() { return _load_buffer_limit; } + int64_t load_buffer_limit() const { return _memory_limit * _load_buffer_ratio / 100; } - bool has_changed_to_hard_limit() const { return _has_changed_hard_limit; } - - void change_to_hard_limit(bool to_hard_limit) { _has_changed_hard_limit = to_hard_limit; } + int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile); private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit @@ -237,19 +221,11 @@ private: // For example, load memtable, write to parquet. // If the wg's memory reached high water mark, then the load buffer // will be restricted to this limit. - int64_t _load_buffer_limit; - std::atomic<bool> _has_changed_hard_limit = false; - - // memory used by load memtable - int64_t _active_mem_usage = 0; - int64_t _queue_mem_usage = 0; - int64_t _flush_mem_usage = 0; - - // `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory. - // more detailed description in `refresh_wg_weighted_memory_limit`. - std::atomic<int64_t> _weighted_memory_limit {0}; // - // last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called. + int64_t _load_buffer_ratio = 0; + std::atomic<bool> _enable_load_buffer_limit = false; + std::atomic_int64_t _total_mem_used = 0; // bytes + std::atomic_int64_t _load_buffer_size = 0; std::atomic_int64_t _wg_refresh_interval_memory_growth; bool _enable_memory_overcommit; std::atomic<uint64_t> _cpu_share; @@ -307,6 +283,7 @@ struct WorkloadGroupInfo { const int read_bytes_per_second = -1; const int remote_read_bytes_per_second = -1; const int total_query_slot_count = 0; + const int load_buffer_ratio = 0; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index b42aeeb1b43..784c09555c4 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -162,18 +162,16 @@ struct WorkloadGroupMemInfo { std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots = std::list<std::shared_ptr<MemTrackerLimiter>>(); }; + void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); // 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time) // and calculate total memory used of all queries. int64_t all_workload_groups_mem_usage = 0; - std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info; bool has_wg_exceed_limit = false; for (auto& [wg_id, wg] : _workload_groups) { - wgs_mem_info[wg_id].total_mem_used = - wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots); - all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used; + all_workload_groups_mem_usage += wg->refresh_memory_usage(); if (wg->exceed_limit()) { has_wg_exceed_limit = true; } @@ -219,7 +217,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { weighted_memory_limit_ratio); LOG_EVERY_T(INFO, 60) << debug_msg; for (auto& wg : _workload_groups) { - update_queries_limit(wg.second, false); + update_queries_limit_(wg.second, false); } } @@ -264,43 +262,27 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que } } -/** - * 1. When Process's memory is lower than soft limit, then all workload group will be converted to hard limit (Exception: there is only one workload group). - * 2. Reserve logic for workload group that is soft limit take no effect, it will always return success. - * 3. QueryLimit for streamload,routineload,group commit, take no affect, it will always return success, but workload group's hard limit will take affect. - * 4. See handle_non_overcommit_wg_paused_queries for hard limit logic. - */ -void WorkloadGroupMgr::handle_paused_queries() { - handle_non_overcommit_wg_paused_queries(); - handle_overcommit_wg_paused_queries(); -} - /** * Strategy 1: A revocable query should not have any running task(PipelineTask). * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit * strategy 3: If any query exceed process memlimit, then should clear all caches. * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: - * 1. cancel other wg's(soft limit) query that exceed limit - * 2. spill disk - * 3. cancel it self. */ -void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { +void WorkloadGroupMgr::handle_paused_queries() { const int64_t TIMEOUT_IN_QUEUE = 1000L * 10; std::unique_lock<std::mutex> lock(_paused_queries_lock); - std::vector<std::weak_ptr<QueryContext>> resume_after_gc; + bool has_revoked_from_other_group = false; for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; const auto& wg = it->first; - if (queries_list.empty()) { - it = _paused_queries_list.erase(it); - continue; - } + bool is_low_wartermark = false; bool is_high_wartermark = false; - wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. // The query's memlimit is set using slot mechanism and its value is set using the user settings, not // by weighted value. So if reserve failed, then it is actually exceed limit. @@ -317,29 +299,12 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { query_it = queries_list.erase(query_it); continue; } - bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit(); - // Only deal with non overcommit workload group. - if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit && - !query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { - // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, - // when reserve, the wg is hard limit, the query reserve failed, but when this loop run - // the wg is converted to soft limit. - // So that should resume the query. - LOG(WARNING) << "query: " << print_id(query_ctx->query_id()) - << " reserve memory failed, but workload group not converted to hard " - "limit, it should not happen, resume it again. paused reason: " - << query_ctx->paused_reason(); - query_ctx->set_memory_sufficient(true); - query_it = queries_list.erase(query_it); - continue; - } if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { - CHECK(!wg->enable_memory_overcommit() || wg_changed_to_hard_limit); // Streamload, kafka load, group commit will never have query memory exceeded error because // their query limit is very large. - bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, - query_ctx->paused_reason()); + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_ctx->paused_reason()); if (!spill_res) { ++query_it; continue; @@ -348,7 +313,21 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { continue; } } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { - CHECK(!wg->enable_memory_overcommit() || wg_changed_to_hard_limit); + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } // check if the reserve is too large, if it is too large, // should set the query's limit only. // Check the query's reserve with expected limit. @@ -358,13 +337,28 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { query_ctx->set_memory_sufficient(true); LOG(INFO) << "workload group memory reserve failed because " << query_ctx->debug_string() << " reserve size " - << query_it->reserve_size_ << " is too large, set hard limit to " - << query_ctx->expected_mem_limit() << " and resume running."; + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->expected_mem_limit()) + << " and resume running."; query_it = queries_list.erase(query_it); continue; } + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_load_buffer_limit(true); + ++query_it; + continue; + } else { + // If could not revoke memory by flush memtable, then disable load buffer limit + wg->enable_load_buffer_limit(false); + } if (!has_changed_hard_limit) { - update_queries_limit(wg, true); + update_queries_limit_(wg, true); has_changed_hard_limit = true; LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " reserve memory failed due to workload group memory exceed, " @@ -372,42 +366,6 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { "so that other query will reduce their memory. wg: " << wg->debug_string(); } - // If there are a lot of memtable memory, then wait them flush finished. - MemTableMemoryLimiter* memtable_limiter = - doris::ExecEnv::GetInstance()->memtable_memory_limiter(); - // Not use memlimit, should use high water mark. - int64_t memtable_active_bytes = 0; - int64_t memtable_queue_bytes = 0; - int64_t memtable_flush_bytes = 0; - wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes, - &memtable_flush_bytes); - // TODO: should add a signal in memtable limiter to prevent new batch - // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. - // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value - // should be larged, if the group is optimized for query, then the value should be smaller. - int64_t max_wg_memtable_bytes = wg->load_buffer_limit(); - if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > - max_wg_memtable_bytes) { - // There are many table in flush queue, just waiting them flush finished. - if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { - LOG_EVERY_T(INFO, 60) - << wg->name() << " load memtable size is: " << memtable_active_bytes - << ", " << memtable_queue_bytes << ", " << memtable_flush_bytes - << ", load buffer limit is: " << max_wg_memtable_bytes - << " wait for flush finished to release more memory"; - continue; - } else { - // Flush some memtables(currently written) to flush queue. - memtable_limiter->flush_workload_group_memtables( - wg->id(), - memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); - LOG_EVERY_T(INFO, 60) - << wg->name() << " load memtable size is: " << memtable_active_bytes - << ", " << memtable_queue_bytes << ", " << memtable_flush_bytes - << ", flush some active memtable to revoke memory"; - continue; - } - } // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, // and then set wg's flag, other query may not free memory very quickly. if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { @@ -415,13 +373,12 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " will be resume."; query_ctx->set_memory_sufficient(true); query_it = queries_list.erase(query_it); + continue; } else { ++query_it; + continue; } - continue; } else { - // PROCESS Reserve logic using hard limit, if reached here, should try to spill or cancel. - // GC Logic also work at hard limit, so GC may cancel some query and could not spill here. // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata // used too much memory. Should clean all cache here. // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. @@ -437,34 +394,43 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { "to 0 now"; } if (query_it->cache_ratio_ < 0.001) { - if (query_it->any_wg_exceed_limit_) { - if (wg->enable_memory_overcommit()) { - if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { - resume_after_gc.push_back(query_ctx); + // 1. Check if could revoke some memory from memtable + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + ++query_it; + continue; + } + // TODO should wait here to check if the process has release revoked_size memory and then continue. + if (!has_revoked_from_other_group) { + int64_t revoked_size = revoke_memory_from_other_group_( + query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); + if (revoked_size > 0) { + has_revoked_from_other_group = true; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + // Do not care if the revoked_size > reserve size, and try to run again. + continue; + } else { + bool spill_res = handle_single_query_( + query_ctx, query_it->reserve_size_, query_ctx->paused_reason()); + if (spill_res) { query_it = queries_list.erase(query_it); continue; } else { ++query_it; continue; } - } else { - // current workload group is hard limit, should not wait other wg with - // soft limit, just cancel - resume_after_gc.push_back(query_ctx); - query_it = queries_list.erase(query_it); - continue; } } else { - // TODO: Find other exceed limit workload group and cancel query. - bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, - query_ctx->paused_reason()); - if (!spill_res) { - ++query_it; - continue; - } else { - query_it = queries_list.erase(query_it); - continue; - } + // If any query is cancelled during process limit stage, should resume other query and + // do not do any check now. + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; } } if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < @@ -479,68 +445,148 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { ++query_it; } } + // Not need waiting flush memtable and below low watermark disable load buffer limit + if (flushed_memtable_bytes <= 0 && !is_low_wartermark) { + wg->enable_load_buffer_limit(false); + } - // Finished deal with one workload group, and should deal with next one. - ++it; - } - // TODO minor GC to release some query - if (!resume_after_gc.empty()) { + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } else { + // Finished deal with one workload group, and should deal with next one. + ++it; + } } - for (auto resume_it = resume_after_gc.begin(); resume_it != resume_after_gc.end(); - ++resume_it) { - auto query_ctx = resume_it->lock(); - if (query_ctx != nullptr) { - query_ctx->set_memory_sufficient(true); +} + +// Return the expected free bytes if memtable could flush +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_( + std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t need_free_mem) { + // If there are a lot of memtable memory, then wait them flush finished. + MemTableMemoryLimiter* memtable_limiter = + doris::ExecEnv::GetInstance()->memtable_memory_limiter(); + int64_t memtable_active_bytes = 0; + int64_t memtable_queue_bytes = 0; + int64_t memtable_flush_bytes = 0; + memtable_limiter->get_workload_group_memtable_usage( + wg->id(), &memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); + // TODO: should add a signal in memtable limiter to prevent new batch + // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value + // should be larged, if the group is optimized for query, then the value should be smaller. + int64_t max_wg_memtable_bytes = wg->load_buffer_limit(); + if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > + max_wg_memtable_bytes) { + // There are many table in flush queue, just waiting them flush finished. + if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", load buffer limit is: " << max_wg_memtable_bytes + << " wait for flush finished to release more memory"; + return memtable_queue_bytes + memtable_flush_bytes; + } else { + // Flush some memtables(currently written) to flush queue. + memtable_limiter->flush_workload_group_memtables( + wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", flush some active memtable to revoke memory"; + return memtable_queue_bytes + memtable_flush_bytes + memtable_active_bytes - + (int64_t)(max_wg_memtable_bytes * 0.6); } } + return 0; } -// streamload, kafka routine load, group commit -// insert into select -// select +int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, + bool hard_limit, int64_t need_free_mem) { + int64_t total_freed_mem = 0; + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("RevokeMemory"); + // 1. memtable like memory + // 2. query exceed workload group limit + int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, profile.get()); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + if (hard_limit) { + freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, + doris::QUERY_MIN_MEMORY, profile.get()); + } else { + freed_mem = cancel_top_query_in_overcommit_group_( + need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), + profile.get()); + } + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + return total_freed_mem; +} -void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { - std::shared_lock<std::shared_mutex> r_lock(_group_mutex); - // If there is only one workload group and it is overcommit, then do nothing. - // And should also start MinorGC logic. - if (_workload_groups.size() == 1) { - return; +// Revoke memory from workload group that exceed it's limit. For example, if the wg's limit is 10g, but used 12g +// then should revoke 2g from the group. +int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, + int64_t need_free_mem, + RuntimeProfile* profile) { + int64_t total_freed_mem = 0; + // 1. check memtable usage, and try to free them. + int64_t freed_mem = revoke_memtable_from_overcommited_groups_(need_free_mem, profile); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; } - // soft_limit - 10%, will change workload group to hard limit. - // soft limit, process memory reserve failed. - // hard limit, FullGC will kill query randomly. - if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit( - (int64_t)(MemInfo::mem_limit() * 0.1))) { - for (auto& [wg_id, wg] : _workload_groups) { - if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) { - wg->change_to_hard_limit(true); - LOG(INFO) << "Process memory usage + 10% will exceed soft limit, change all " - "workload " - "group with overcommit to hard limit now. " - << wg->debug_string(); - } + // 2. Cancel top usage query, one by one + std::map<WorkloadGroupPtr, int64_t> wg_mem_usage; + using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>; + auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) { + return left.second < right.second; + }; + std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, decltype(cmp)> heap(cmp); + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { + heap.emplace(iter->second, iter->second->memory_used()); } } - // If current memory usage is below soft memlimit - 15%, then enable wg's overcommit - if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit( - (int64_t)(MemInfo::mem_limit() * 0.15))) { - for (auto& [wg_id, wg] : _workload_groups) { - if (wg->enable_memory_overcommit() && wg->has_changed_to_hard_limit()) { - wg->change_to_hard_limit(false); - LOG(INFO) << "Process memory usage is lower than soft limit, enable all workload " - "group overcommit now. " - << wg->debug_string(); - } - } + while (!heap.empty() && need_free_mem - total_freed_mem > 0 && !requestor->is_cancelled()) { + auto [wg, sort_mem] = heap.top(); + heap.pop(); + freed_mem = wg->free_overcommited_memory(need_free_mem - total_freed_mem, profile); + total_freed_mem += freed_mem; } + return total_freed_mem; } -// If the query could release some memory, for example, spill disk, flush memtable then the return value is true. +// If the memtable is too large, then flush them and wait for finished. +int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, + RuntimeProfile* profile) { + return 0; +} + +// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage task that with most memory. +// 2. Maybe not valid because it's memory not exceed limit. +int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_free_mem, + int64_t lower_bound, + RuntimeProfile* profile) { + return 0; +} + +// streamload, kafka routine load, group commit +// insert into select +// select + +// If the query could release some memory, for example, spill disk, then the return value is true. // If the query could not release memory, then cancel the query, the return value is true. -// If the query is not ready to do these tasks, it means just wait. -bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_ctx, - size_t size_to_reserve, Status paused_reason) { - // TODO: If the query is an insert into select query, should consider memtable as revoke memory. +// If the query is not ready to do these tasks, it means just wait, then return value is false. +bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_ctx, + size_t size_to_reserve, Status paused_reason) { size_t revocable_size = 0; size_t memory_usage = 0; bool has_running_task = false; @@ -556,6 +602,8 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c if (revocable_tasks.empty()) { if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { const auto limit = query_ctx->get_mem_limit(); + // During waiting time, another operator in the query may finished and release + // many memory and we could run. if ((memory_usage + size_to_reserve) < limit) { LOG(INFO) << "query: " << query_id << ", usage(" << memory_usage << " + " << size_to_reserve << ") less than limit(" << limit << "), resume it."; @@ -564,7 +612,8 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c } else { // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "query({}) reserve memory failed, but could not find memory that could " + "query({}) reserve memory failed, but could not find memory that " + "could " "release or spill to disk(usage:{}, limit: {})", query_id, memory_usage, query_ctx->get_mem_limit())); } @@ -584,7 +633,8 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( "The query({}) reserved memory failed because process limit exceeded, and " - "there is no cache now. And could not find task to spill. Maybe you should set " + "there is no cache now. And could not find task to spill. Maybe you should " + "set " "the workload group's limit to a lower value.", query_id)); } @@ -595,21 +645,15 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c return true; } -void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit) { +void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit) { auto wg_mem_limit = wg->memory_limit(); - auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1); - wg->set_weighted_memory_limit(wg_weighted_mem_limit); auto all_query_ctxs = wg->queries(); bool is_low_wartermark = false; bool is_high_wartermark = false; wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); int64_t wg_high_water_mark_limit = (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 1.0 / 100); - int64_t memtable_active_bytes = 0; - int64_t memtable_queue_bytes = 0; - int64_t memtable_flush_bytes = 0; - wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); - int64_t memtable_usage = memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes; + int64_t memtable_usage = wg->load_mem_used(); int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit; if (memtable_usage > wg->load_buffer_limit()) { wg_high_water_mark_except_load = wg_high_water_mark_limit - wg->load_buffer_limit(); @@ -620,20 +664,23 @@ void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har std::string debug_msg; if (is_high_wartermark || is_low_wartermark) { debug_msg = fmt::format( - "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, " + "\nWorkload Group {}: mem limit: {}, mem used: {}, " "high water mark mem limit: {}, load memtable usage: {}, used ratio: {}", wg->name(), PrettyPrinter::print(wg->memory_limit(), TUnit::BYTES), PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES), - PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES), PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES), PrettyPrinter::print(memtable_usage, TUnit::BYTES), - (double)(wg->total_mem_used()) / wg_weighted_mem_limit); + (double)(wg->total_mem_used()) / wg_mem_limit); } // If the wg enable over commit memory, then it is no need to update query memlimit - if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) { + if (wg->enable_memory_overcommit()) { return; } + // If reached low watermark then enable load buffer limit + if (is_low_wartermark) { + wg->enable_load_buffer_limit(true); + } int32_t total_used_slot_count = 0; int32_t total_slot_count = wg->total_query_slot_count(); // calculate total used slot count @@ -657,7 +704,7 @@ void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har int64_t query_weighted_mem_limit = 0; int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit - if (query_ctx->enable_query_slot_hard_limit()) { + if (!query_ctx->enable_mem_overcommit()) { if (total_slot_count < 1) { LOG(WARNING) << "query " << print_id(query_ctx->query_id()) @@ -701,21 +748,4 @@ void WorkloadGroupMgr::stop() { } } -void WorkloadGroupMgr::update_load_memtable_usage( - const std::map<uint64_t, MemtableUsage>& wg_memtable_usages) { - // Use readlock here, because it will not modify workload_groups - std::shared_lock<std::shared_mutex> r_lock(_group_mutex); - for (auto it = _workload_groups.begin(); it != _workload_groups.end(); ++it) { - auto wg_usage = wg_memtable_usages.find(it->first); - if (wg_usage != wg_memtable_usages.end()) { - it->second->update_load_mem_usage(wg_usage->second.active_mem_usage, - wg_usage->second.queue_mem_usage, - wg_usage->second.flush_mem_usage); - } else { - // Not anything in memtable limiter, then set to 0 - it->second->update_load_mem_usage(0, 0, 0); - } - } -} - } // namespace doris diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index c8e1ee7adf7..065528c66ec 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -105,14 +105,20 @@ public: void handle_paused_queries(); - void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& wg_memtable_usages); - private: - bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t size_to_reserve, - Status paused_reason); - void handle_non_overcommit_wg_paused_queries(); - void handle_overcommit_wg_paused_queries(); - void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit); + int64_t cancel_top_query_in_overcommit_group_(int64_t need_free_mem, int64_t lower_bound, + RuntimeProfile* profile); + int64_t flush_memtable_from_current_group_(std::shared_ptr<QueryContext> requestor, + WorkloadGroupPtr wg, int64_t need_free_mem); + bool handle_single_query_(std::shared_ptr<QueryContext> query_ctx, size_t size_to_reserve, + Status paused_reason); + int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, + bool hard_limit, int64_t need_free_mem); + int64_t revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, + int64_t need_free_mem, RuntimeProfile* profile); + int64_t revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, + RuntimeProfile* profile); + void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit); private: std::shared_mutex _group_mutex; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 96dfd85d297..df30e6777d9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -503,7 +503,8 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block } { SCOPED_TIMER(_wait_mem_limit_timer); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); + ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush( + _state->workload_group()); } SCOPED_TIMER(_write_memtable_timer); st = delta_writer->write(block.get(), rows.row_idxes); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 97cedeafd03..52e78618c05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -633,7 +633,7 @@ public class SessionVariable implements Serializable, Writable { public static final String QUERY_SLOT_COUNT = "query_slot_count"; - public static final String ENABLE_QUERY_SLOT_HARD_LIMIT = "enable_query_slot_hard_limit"; + public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit"; public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num"; @@ -727,7 +727,7 @@ public class SessionVariable implements Serializable, Writable { public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS; // max memory used on every backend. - @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) + @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true) public long maxExecMemByte = 2147483648L; @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, @@ -829,10 +829,10 @@ public class SessionVariable implements Serializable, Writable { } } - @VariableMgr.VarAttr(name = ENABLE_QUERY_SLOT_HARD_LIMIT, needForward = true, description = { - "是否通过硬限的方式来计算每个Slot的内存资源", - "Whether to calculate the memory resources of each Slot by hard limit"}) - public boolean enableQuerySlotHardLimit = false; + @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true, description = { + "是否通过硬限的方式来计算每个Query的内存资源", + "Whether to calculate the memory resources of each query by hard limit"}) + public boolean enableMemOvercommit = true; @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM) public int maxColumnReaderNum = 20000; @@ -3883,7 +3883,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames); tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames); tResult.setQuerySlotCount(wgQuerySlotCount); - tResult.setEnableQuerySlotHardLimit(enableQuerySlotHardLimit); + tResult.setEnableMemOvercommit(enableMemOvercommit); tResult.setKeepCarriageReturn(keepCarriageReturn); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 42929be609b..b428a058277 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -57,6 +57,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; + public static final String LOAD_BUFFER_RATIO = "load_buffer_ratio"; + public static final String MAX_CONCURRENCY = "max_concurrency"; public static final String MAX_QUEUE_SIZE = "max_queue_size"; @@ -87,10 +89,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) - .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build(); + .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND) + .add(LOAD_BUFFER_RATIO).build(); - public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; - public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; + public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75; + public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90; + public static final int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20; @SerializedName(value = "id") private long id; @@ -126,6 +130,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.memoryLimitPercent = Double.parseDouble( memoryLimitString.substring(0, memoryLimitString.length() - 1)); } + + if (properties.containsKey(LOAD_BUFFER_RATIO)) { + String loadBufLimitStr = properties.get(LOAD_BUFFER_RATIO); + if (loadBufLimitStr.endsWith("%")) { + loadBufLimitStr = loadBufLimitStr.substring(0, loadBufLimitStr.length() - 1); + } + this.properties.put(LOAD_BUFFER_RATIO, loadBufLimitStr); + } else { + this.properties.put(LOAD_BUFFER_RATIO, LOAD_BUFFER_RATIO_DEFAULT_VALUE + ""); + } + if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase()); } @@ -256,6 +271,26 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + if (properties.containsKey(LOAD_BUFFER_RATIO)) { + String memoryLimit = properties.get(LOAD_BUFFER_RATIO); + if (!memoryLimit.endsWith("%")) { + throw new DdlException(LOAD_BUFFER_RATIO + " " + memoryLimit + + " requires a percentage and ends with a '%'"); + } + String memLimitErr = LOAD_BUFFER_RATIO + " " + memoryLimit + + " requires a positive int number."; + try { + if (Integer.parseInt(memoryLimit.substring(0, memoryLimit.length() - 1)) < 0) { + throw new DdlException(memLimitErr); + } + } catch (NumberFormatException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(memLimitErr, e); + } + throw new DdlException(memLimitErr); + } + } + if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { String value = properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase(); if (!("true".equals(value) || "false".equals(value))) { @@ -482,6 +517,12 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add("-1"); } else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) { row.add("0%"); + } else if (LOAD_BUFFER_RATIO.equals(key)) { + if (properties.containsKey(key)) { + row.add(properties.get(key) + "%"); + } else { + row.add(LOAD_BUFFER_RATIO_DEFAULT_VALUE + "%"); + } } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) { row.add("true"); } else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { @@ -568,6 +609,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (memLimitStr != null) { tWorkloadGroupInfo.setMemLimit(memLimitStr); } + String loadBufferRatioStr = properties.get(LOAD_BUFFER_RATIO); + if (loadBufferRatioStr != null) { + tWorkloadGroupInfo.setLoadBufferRatio(Integer.parseInt(loadBufferRatioStr)); + } String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT); if (memOvercommitStr != null) { tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr)); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 4acd4602432..fa4fedcc9bb 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -271,6 +271,7 @@ struct TWorkloadGroupInfo { 15: optional i64 remote_read_bytes_per_second 16: optional string tag 17: optional i32 total_query_slot_count + 18: optional i32 load_buffer_ratio } enum TWorkloadMetricType { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d514c473d88..37c7da3d702 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -352,7 +352,7 @@ struct TQueryOptions { // The minimum memory that an operator required to run. 137: optional i32 minimum_operator_memory_required_kb = 1024; - 138: optional bool enable_query_slot_hard_limit = false; + 138: optional bool enable_mem_overcommit = true; 139: optional i32 query_slot_count = 0; 140: optional bool enable_auto_create_when_overwrite = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org