This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 9c3f3fcb9a2 refactor write buffer (#42556) 9c3f3fcb9a2 is described below commit 9c3f3fcb9a2f38017b72ac9972bba077901b5fcd Author: yiguolei <676222...@qq.com> AuthorDate: Mon Oct 28 14:36:37 2024 +0800 refactor write buffer (#42556) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable.cpp | 8 ++++++-- be/src/olap/memtable_flush_executor.cpp | 2 ++ be/src/runtime/memory/mem_tracker_limiter.cpp | 18 +++--------------- be/src/runtime/memory/mem_tracker_limiter.h | 19 +++++-------------- be/src/runtime/workload_group/workload_group.cpp | 5 ++--- be/src/runtime/workload_group/workload_group.h | 3 +-- .../runtime/workload_group/workload_group_manager.cpp | 5 ++--- 7 files changed, 21 insertions(+), 39 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index facbc90c450..cc16cb481ed 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -78,7 +78,6 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _mem_tracker = std::make_shared<MemTracker>(); - _query_thread_context.query_mem_tracker->push_load_buffer(_mem_tracker); } void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, @@ -144,7 +143,8 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { } MemTable::~MemTable() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + _query_thread_context.query_mem_tracker->write_tracker()); if (_is_flush_success) { // If the memtable is flush success, then its memtracker's consumption should be 0 if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { @@ -184,6 +184,8 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r Status MemTable::insert(const vectorized::Block* input_block, const std::vector<uint32_t>& row_idxs) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + _query_thread_context.query_mem_tracker->write_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); if (_is_first_insertion) { @@ -580,6 +582,8 @@ void MemTable::_aggregate() { } void MemTable::shrink_memtable_by_agg() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + _query_thread_context.query_mem_tracker->write_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); if (_keys_type == KeysType::DUP_KEYS) { return; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index dc911647be8..7599a3f6b86 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -142,6 +142,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in signal::set_signal_task_id(_rowset_writer->load_id()); signal::tablet_id = memtable->tablet_id(); { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + memtable->query_thread_context().query_mem_tracker->write_tracker()); SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); std::unique_ptr<vectorized::Block> block; RETURN_IF_ERROR(memtable->to_block(&block)); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 487b7d48fae..c0555c2fcf3 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -82,6 +82,9 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerLimiter::create_shared(MemTrackerLi const std::string& label, int64_t byte_limit) { auto tracker = std::make_shared<MemTrackerLimiter>(type, label, byte_limit); + // Write tracker is only used to tracker the size, so limit == -1 + auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable" + label, -1); + tracker->_write_tracker.swap(write_tracker); #ifndef BE_TEST DCHECK(ExecEnv::tracking_memory()); std::lock_guard<std::mutex> l( @@ -257,21 +260,6 @@ void MemTrackerLimiter::clean_tracker_limiter_group() { #endif } -void MemTrackerLimiter::update_load_buffer_size() { - std::lock_guard l(_load_buffer_lock); - int64_t total_buf_size = 0; - for (auto memtable_tracker = _load_buffers.begin(); memtable_tracker != _load_buffers.end();) { - auto m = memtable_tracker->lock(); - if (m == nullptr) { - memtable_tracker = _load_buffers.erase(memtable_tracker); - } else { - total_buf_size += m->consumption(); - ++memtable_tracker; - } - } - _load_buffer_size = total_buf_size; -} - void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type) { if (type == Type::GLOBAL) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 6f0ecdbe975..1b03f2f2082 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -130,7 +130,6 @@ public: int64_t byte_limit = -1); // byte_limit equal to -1 means no consumption limit, only participate in process memory statistics. MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit); - ~MemTrackerLimiter(); Type type() const { return _type; } @@ -210,8 +209,7 @@ public: if (UNLIKELY(bytes == 0)) { return true; } - // Reserve will check limit, should ignore load buffer size. - bool rt = _mem_counter.try_add(bytes - _load_buffer_size, _limit); + bool rt = _mem_counter.try_add(bytes, _limit); if (rt && _query_statistics) { _query_statistics->set_max_peak_memory_bytes(peak_consumption()); _query_statistics->set_current_used_memory_bytes(consumption()); @@ -237,14 +235,9 @@ public: static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num); static void make_all_tasks_tracker_profile(RuntimeProfile* profile); - void push_load_buffer(std::shared_ptr<MemTracker> memtable_tracker) { - std::lock_guard l(_load_buffer_lock); - _load_buffers.push_back(memtable_tracker); - } - - void update_load_buffer_size(); + int64_t load_buffer_size() const { return _write_tracker->consumption(); } - int64_t load_buffer_size() const { return _load_buffer_size; } + std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; } void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } @@ -340,9 +333,7 @@ private: std::shared_ptr<QueryStatistics> _query_statistics = nullptr; - std::mutex _load_buffer_lock; - std::vector<std::weak_ptr<MemTracker>> _load_buffers; - std::atomic<int64_t> _load_buffer_size = 0; + std::shared_ptr<MemTrackerLimiter> _write_tracker; struct AddressSanitizer { size_t size; @@ -377,7 +368,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { return Status::OK(); } // check limit should ignore memtable size, because it is treated as a cache - if (_limit > 0 && consumption() - _load_buffer_size + bytes > _limit) { + if (_limit > 0 && consumption() + bytes > _limit) { return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", MemCounter::print_bytes(bytes), tracker_limit_exceeded_str())); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index f7a42a34a5a..c0895e8f0fd 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -194,7 +194,6 @@ int64_t WorkloadGroup::refresh_memory_usage() { if (tracker == nullptr) { trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); } else { - tracker->update_load_buffer_size(); used_memory += tracker->consumption(); load_buffer_size += tracker->load_buffer_size(); ++trackerWptr; @@ -202,13 +201,13 @@ int64_t WorkloadGroup::refresh_memory_usage() { } } // refresh total memory used. - _total_mem_used = used_memory; + _total_mem_used = used_memory + load_buffer_size; _load_buffer_size = load_buffer_size; // reserve memory is recorded in the query mem tracker // and _total_mem_used already contains all the current reserve memory. // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. _wg_refresh_interval_memory_growth.store(0.0); - _mem_used_status->set_value(used_memory); + _mem_used_status->set_value(_total_mem_used); return used_memory; } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 5ec574a5381..ccdc6374ce8 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -80,7 +80,7 @@ public: int64_t total_mem_used() const { return _total_mem_used; } - int64_t load_mem_used() const { return _load_buffer_size; } + int64_t write_buffer_size() const { return _load_buffer_size; } void enable_write_buffer_limit(bool enable_limit) { _enable_write_buffer_limit = enable_limit; } @@ -190,7 +190,6 @@ public: void update_total_local_scan_io_adder(size_t scan_bytes); - int64_t get_mem_used() { return _mem_used_status->get_value(); } uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); } int64_t get_local_scan_bytes_per_second() { return _total_local_scan_io_per_second->get_value(); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 25c983b98f3..d7130ae26e4 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -232,7 +232,7 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { for (const auto& [id, wg] : _workload_groups) { SchemaScannerHelper::insert_int64_value(0, be_id, block); SchemaScannerHelper::insert_int64_value(1, wg->id(), block); - SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block); + SchemaScannerHelper::insert_int64_value(2, wg->total_mem_used(), block); double cpu_usage_p = (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; @@ -543,7 +543,6 @@ int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryConte return total_freed_mem; } // 2. Cancel top usage query, one by one - std::map<WorkloadGroupPtr, int64_t> wg_mem_usage; using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>; auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) { return left.second < right.second; @@ -653,7 +652,7 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); int64_t wg_high_water_mark_limit = (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 1.0 / 100); - int64_t memtable_usage = wg->load_mem_used(); + int64_t memtable_usage = wg->write_buffer_size(); int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit; if (memtable_usage > wg->write_buffer_limit()) { wg_high_water_mark_except_load = wg_high_water_mark_limit - wg->write_buffer_limit(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org