This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f58e18a34b3 [opt](memtracker) Optimize `PODArray` memory tracking accuracy (#50549) f58e18a34b3 is described below commit f58e18a34b3298a6311c3f9fb40d7c882a6ac2eb Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Sat May 10 17:50:30 2025 +0800 [opt](memtracker) Optimize `PODArray` memory tracking accuracy (#50549) ### What problem does this PR solve? Based on #11740 The memory size tracked in Allocator is virtual memory. If the allocated memory is not fully used, the tracked virtual memory may be larger than the actual physical memory. `PODArray` does not memset 0 when allocated memory, the memory blocks allocated by `PODArray` (such as VOlapScanNode::_free_blocks) are usually used for memory reuse and will not be fully used. After this PR, the unused memory in `PODArray` is recorded in reserve to indicate that this part of the memory is not actually used. --- be/src/olap/memtable_flush_executor.cpp | 3 +- be/src/runtime/memory/global_memory_arbitrator.cpp | 10 + be/src/runtime/memory/global_memory_arbitrator.h | 1 + be/src/runtime/memory/mem_counter.h | 5 - be/src/runtime/memory/mem_tracker.h | 4 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 7 +- be/src/runtime/memory/mem_tracker_limiter.h | 2 +- be/src/runtime/memory/memory_reclamation.cpp | 6 +- be/src/runtime/memory/thread_mem_tracker_mgr.h | 77 +++-- be/src/runtime/query_context.cpp | 6 +- be/src/runtime/workload_group/workload_group.cpp | 8 +- .../workload_group/workload_group_manager.cpp | 4 +- .../runtime/workload_management/memory_context.cpp | 6 +- .../workload_management/task_controller.cpp | 6 +- be/src/util/pretty_printer.h | 3 +- be/src/vec/common/allocator.cpp | 334 +++++++++++++++------ be/src/vec/common/allocator.h | 204 ++++--------- be/src/vec/common/allocator_fwd.h | 2 + be/src/vec/common/pod_array.h | 113 ++++++- be/src/vec/common/pod_array_fwd.h | 13 +- .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 2 +- .../testutil/mock/mock_thread_mem_tracker_mgr.h | 5 +- be/test/vec/common/pod_array_test.cpp | 291 +++++++++++++++++- 23 files changed, 798 insertions(+), 314 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 44df6cd46f1..938b12ef5a9 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -149,7 +149,8 @@ Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& r int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s; do { // only try to reserve process memory - st = thread_context->thread_mem_tracker_mgr->try_reserve(size, true); + st = thread_context->thread_mem_tracker_mgr->try_reserve( + size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS); if (st.ok()) { memtable_flush_executor->inc_flushing_task(); break; diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 33d2941f884..5aed0721fff 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -96,6 +96,16 @@ void GlobalMemoryArbitrator::refresh_memory_bvar() { memory_arbitrator_refresh_interval_growth_bytes.get_value(); } +bool GlobalMemoryArbitrator::reserve_process_memory(int64_t bytes) { + int64_t old_reserved_mem = _process_reserved_memory.load(std::memory_order_relaxed); + int64_t new_reserved_mem = 0; + do { + new_reserved_mem = old_reserved_mem + bytes; + } while (!_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, + std::memory_order_relaxed)); + return true; +} + bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) { doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage(); diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index 7ac6fc4795e..7ac98b1c1f2 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -90,6 +90,7 @@ public: return msg; } + static bool reserve_process_memory(int64_t bytes); static bool try_reserve_process_memory(int64_t bytes); static void shrink_process_reserved(int64_t bytes); diff --git a/be/src/runtime/memory/mem_counter.h b/be/src/runtime/memory/mem_counter.h index 029bf01bb70..335c91f18ee 100644 --- a/be/src/runtime/memory/mem_counter.h +++ b/be/src/runtime/memory/mem_counter.h @@ -83,11 +83,6 @@ public: int64_t current_value() const { return _current_value.load(std::memory_order_relaxed); } int64_t peak_value() const { return _peak_value.load(std::memory_order_relaxed); } - static std::string print_bytes(int64_t bytes) { - return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) - : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); - } - private: std::atomic<int64_t> _current_value {0}; std::atomic<int64_t> _peak_value {0}; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index deb27ee2639..382efa82de0 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -47,8 +47,8 @@ public: const std::string& label() const { return _label; } std::string log_usage() const { return fmt::format("MemTracker name={}, Used={}({} B), Peak={}({} B)", _label, - MemCounter::print_bytes(consumption()), consumption(), - MemCounter::print_bytes(peak_consumption()), peak_consumption()); + PrettyPrinter::print_bytes(consumption()), consumption(), + PrettyPrinter::print_bytes(peak_consumption()), peak_consumption()); } private: diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 16f8ffb01f3..c564aa3691a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -360,9 +360,10 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { std::string err_msg = fmt::format( "memory tracker limit exceeded, tracker label:{}, type:{}, limit " "{}, peak used {}, current used {}. backend {}, {}.", - label(), type_string(_type), MemCounter::print_bytes(limit()), - MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()), - BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); + label(), type_string(_type), PrettyPrinter::print_bytes(limit()), + PrettyPrinter::print_bytes(peak_consumption()), + PrettyPrinter::print_bytes(consumption()), BackendOptions::get_localhost(), + GlobalMemoryArbitrator::process_memory_used_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( " exec node:<{}>, can `set exec_mem_limit` to change limit, details see be.INFO.", diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 2051143da53..7dd66eafe1b 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -311,7 +311,7 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { // it will not take effect. if (consumption() + bytes > _limit) { return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}", - MemCounter::print_bytes(bytes), + PrettyPrinter::print_bytes(bytes), tracker_limit_exceeded_str())); } return Status::OK(); diff --git a/be/src/runtime/memory/memory_reclamation.cpp b/be/src/runtime/memory/memory_reclamation.cpp index 162ae28cdcf..5f1b0d55e17 100644 --- a/be/src/runtime/memory/memory_reclamation.cpp +++ b/be/src/runtime/memory/memory_reclamation.cpp @@ -183,7 +183,7 @@ bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) LOG(INFO) << fmt::format( "[MemoryGC] start MemoryReclamation::revoke_process_memory, {}, need free size: {}.", GlobalMemoryArbitrator::process_mem_log_str(), - MemCounter::print_bytes(MemInfo::process_full_gc_size())); + PrettyPrinter::print_bytes(MemInfo::process_full_gc_size())); Defer defer {[&]() { std::stringstream ss; profile->pretty_print(&ss); @@ -191,8 +191,8 @@ bool MemoryReclamation::revoke_process_memory(const std::string& revoke_reason) "[MemoryGC] end MemoryReclamation::revoke_process_memory, {}, need free size: {}, " "free Memory {}. cost(us): {}, details: {}", GlobalMemoryArbitrator::process_mem_log_str(), - MemCounter::print_bytes(MemInfo::process_full_gc_size()), - MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); + PrettyPrinter::print_bytes(MemInfo::process_full_gc_size()), + PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); }}; // step1: start canceling from the query with the largest memory usage until the memory of process_full_gc_size is freed. diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index fdafd172878..44cbfbc0597 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -82,8 +82,21 @@ public: void consume(int64_t size); void flush_untracked_mem(); + enum class TryReserveChecker { + NONE = 0, + CHECK_TASK = 1, + CHECK_WORKLOAD_GROUP = 2, + CHECK_TASK_AND_WORKLOAD_GROUP = 3, + CHECK_PROCESS = 4, + CHECK_TASK_AND_PROCESS = 5, + CHECK_WORKLOAD_GROUP_AND_PROCESS = 6, + CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS = 7, + }; + // if only_check_process_memory == true, still reserve query, wg, process memory, only check process memory. - MOCK_FUNCTION doris::Status try_reserve(int64_t size, bool only_check_process_memory = false); + MOCK_FUNCTION doris::Status try_reserve( + int64_t size, TryReserveChecker checker = + TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS); void shrink_reserved(); @@ -290,8 +303,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = false; } -inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, - bool only_check_process_memory) { +inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, TryReserveChecker checker) { DCHECK(size >= 0); CHECK(init()); DCHECK(_limiter_tracker); @@ -301,48 +313,61 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, flush_untracked_mem(); auto wg_ptr = _wg_wptr.lock(); - if (only_check_process_memory) { - _limiter_tracker->reserve(size); - if (wg_ptr) { - wg_ptr->add_wg_refresh_interval_memory_growth(size); - } - } else { + bool task_limit_checker = static_cast<int>(checker) & 1; + bool workload_group_limit_checker = static_cast<int>(checker) & 2; + bool process_limit_checker = static_cast<int>(checker) & 4; + + if (task_limit_checker) { if (!_limiter_tracker->try_reserve(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because query memory exceeded, memory " - "tracker " - "consumption: {}, limit: {}", - PrettyPrinter::print(size, TUnit::BYTES), - PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES), - PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES)); + "tracker: {}, " + "consumption: {}, limit: {}, peak: {}", + PrettyPrinter::print_bytes(size), _limiter_tracker->label(), + PrettyPrinter::print_bytes(_limiter_tracker->consumption()), + PrettyPrinter::print_bytes(_limiter_tracker->limit()), + PrettyPrinter::print_bytes(_limiter_tracker->peak_consumption())); return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg); } - if (wg_ptr) { + } else { + _limiter_tracker->reserve(size); + } + + if (wg_ptr) { + if (workload_group_limit_checker) { if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because workload group memory exceeded, " "workload group: {}", - PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string()); + PrettyPrinter::print_bytes(size), wg_ptr->memory_debug_string()); _limiter_tracker->release(size); // rollback _limiter_tracker->shrink_reserved(size); // rollback return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg); } + } else { + wg_ptr->add_wg_refresh_interval_memory_growth(size); } } - if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { - auto err_msg = - fmt::format("reserve memory failed, size: {}, because proccess memory exceeded, {}", - PrettyPrinter::print(size, TUnit::BYTES), - GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker->release(size); // rollback - _limiter_tracker->shrink_reserved(size); // rollback - if (wg_ptr) { - wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback + if (process_limit_checker) { + if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { + auto err_msg = fmt::format( + "reserve memory failed, size: {}, because proccess memory exceeded, {}", + PrettyPrinter::print_bytes(size), + GlobalMemoryArbitrator::process_mem_log_str()); + _limiter_tracker->release(size); // rollback + _limiter_tracker->shrink_reserved(size); // rollback + if (wg_ptr) { + wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback + } + return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg); } - return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg); + } else { + doris::GlobalMemoryArbitrator::reserve_process_memory(size); } + _reserved_mem += size; + DCHECK(_reserved_mem >= 0); return doris::Status::OK(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2853f2356ca..82124e75552 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -202,9 +202,9 @@ QueryContext::~QueryContext() { mem_tracker_msg = fmt::format( "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - print_id(_query_id), MemCounter::print_bytes(query_mem_tracker()->limit()), - MemCounter::print_bytes(query_mem_tracker()->consumption()), - MemCounter::print_bytes(query_mem_tracker()->peak_consumption())); + print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()), + PrettyPrinter::print_bytes(query_mem_tracker()->consumption()), + PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption())); } [[maybe_unused]] uint64_t group_id = 0; if (workload_group()) { diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 794a9087d00..83690a79eb0 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -257,18 +257,18 @@ int64_t WorkloadGroup::revoke_memory(int64_t need_free_mem, const std::string& r auto group_revoke_reason = fmt::format( "{}, revoke group id:{}, name:{}, used:{}, limit:{}", revoke_reason, _id, _name, - MemCounter::print_bytes(used_memory), MemCounter::print_bytes(_memory_limit)); + PrettyPrinter::print_bytes(used_memory), PrettyPrinter::print_bytes(_memory_limit)); LOG(INFO) << fmt::format( "[MemoryGC] start WorkloadGroup::revoke_memory, {}, need free size: {}.", - group_revoke_reason, MemCounter::print_bytes(need_free_mem)); + group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem)); Defer defer {[&]() { std::stringstream ss; group_revoke_profile->pretty_print(&ss); LOG(INFO) << fmt::format( "[MemoryGC] end WorkloadGroup::revoke_memory, {}, need free size: {}, free Memory " "{}. cost(us): {}, details: {}", - group_revoke_reason, MemCounter::print_bytes(need_free_mem), - MemCounter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); + group_revoke_reason, PrettyPrinter::print_bytes(need_free_mem), + PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); }}; // step 1. free top overcommit query diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 8768541b909..5b47535e464 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -699,14 +699,14 @@ int64_t WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_( "[MemoryGC] start WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, {}, " "number of overcommited groups: {}, total exceeded memory: {}.", revoke_reason, exceeded_memory_heap.size(), - MemCounter::print_bytes(total_exceeded_memory)); + PrettyPrinter::print_bytes(total_exceeded_memory)); Defer defer {[&]() { std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( "[MemoryGC] end WorkloadGroupMgr::revoke_memory_from_other_overcommited_groups_, " "{}, number of overcommited groups: {}, free memory {}. cost(us): {}, details: {}", - revoke_reason, exceeded_memory_heap.size(), MemCounter::print_bytes(freed_mem), + revoke_reason, exceeded_memory_heap.size(), PrettyPrinter::print_bytes(freed_mem), watch.elapsed_time() / 1000, ss.str()); }}; diff --git a/be/src/runtime/workload_management/memory_context.cpp b/be/src/runtime/workload_management/memory_context.cpp index 211d3539a3c..e1198c24f5c 100644 --- a/be/src/runtime/workload_management/memory_context.cpp +++ b/be/src/runtime/workload_management/memory_context.cpp @@ -25,9 +25,9 @@ namespace doris { std::string MemoryContext::debug_string() { return fmt::format("TaskId={}, Memory(Used={}, Limit={}, Peak={})", print_id(resource_ctx_->task_controller()->task_id()), - MemCounter::print_bytes(current_memory_bytes()), - MemCounter::print_bytes(mem_limit()), - MemCounter::print_bytes(peak_memory_bytes())); + PrettyPrinter::print_bytes(current_memory_bytes()), + PrettyPrinter::print_bytes(mem_limit()), + PrettyPrinter::print_bytes(peak_memory_bytes())); } #include "common/compile_check_end.h" diff --git a/be/src/runtime/workload_management/task_controller.cpp b/be/src/runtime/workload_management/task_controller.cpp index a025f44fbba..b65c9a57d5b 100644 --- a/be/src/runtime/workload_management/task_controller.cpp +++ b/be/src/runtime/workload_management/task_controller.cpp @@ -42,9 +42,9 @@ std::string TaskController::debug_string() { "TaskId={}, Memory(Used={}, Limit={}, Peak={}), Spill(RunningSpillTaskCnt={}, " "TotalPausedPeriodSecs={}, LatestPausedReason={})", print_id(task_id_), - MemCounter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()), - MemCounter::print_bytes(resource_ctx_->memory_context()->mem_limit()), - MemCounter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()), + PrettyPrinter::print_bytes(resource_ctx_->memory_context()->current_memory_bytes()), + PrettyPrinter::print_bytes(resource_ctx_->memory_context()->mem_limit()), + PrettyPrinter::print_bytes(resource_ctx_->memory_context()->peak_memory_bytes()), revoking_tasks_count_, memory_sufficient_time() / NANOS_PER_SEC, paused_reason_.status().to_string()); } diff --git a/be/src/util/pretty_printer.h b/be/src/util/pretty_printer.h index 5e0d6fdfc9e..69cef37da31 100644 --- a/be/src/util/pretty_printer.h +++ b/be/src/util/pretty_printer.h @@ -186,7 +186,8 @@ public: /// Convenience method static std::string print_bytes(int64_t value) { - return PrettyPrinter::print(value, TUnit::BYTES); + return value >= 0 ? PrettyPrinter::print(value, TUnit::BYTES) + : "-" + PrettyPrinter::print(std::abs(value), TUnit::BYTES); } private: diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index c3e9dffaaa6..6ae215e993d 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -46,14 +46,54 @@ std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes; std::mutex RecordSizeMemoryAllocator::_mutex; template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> -void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check( - size_t size) const { +bool Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_exceed( + size_t size, std::string* err_msg) const { #ifdef BE_TEST - if (!doris::ExecEnv::ready()) { + if (!doris::pthread_context_ptr_init) { + return false; + } +#endif + auto* thread_mem_ctx = doris::thread_context()->thread_mem_tracker_mgr.get(); + if (thread_mem_ctx->skip_memory_check != 0) { + return false; + } + + if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) { + // Only thread attach task, and has not completely waited for thread_wait_gc_max_milliseconds, + // will wait for gc. otherwise, if the outside will catch the exception, throwing an exception. + *err_msg += fmt::format( + "Allocator sys memory check failed: Cannot alloc:{}, consuming " + "tracker:<{}>, peak used {}, current used {}, reserved {}, exec node:<{}>, {}.", + doris::PrettyPrinter::print_bytes(size), + thread_mem_ctx->limiter_mem_tracker()->label(), + doris::PrettyPrinter::print_bytes( + thread_mem_ctx->limiter_mem_tracker()->peak_consumption()), + doris::PrettyPrinter::print_bytes( + thread_mem_ctx->limiter_mem_tracker()->consumption()), + doris::PrettyPrinter::print_bytes( + thread_mem_ctx->limiter_mem_tracker()->reserved_consumption()), + thread_mem_ctx->last_consumer_tracker_label(), + doris::GlobalMemoryArbitrator::process_mem_log_str()); + + if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 && + size > doris::config::stacktrace_in_alloc_large_memory_bytes) { + *err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); + } + return true; + } + return false; +} + +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc_fault_probability() + const { +#ifdef BE_TEST + if (!doris::pthread_context_ptr_init) { return; } #endif - if (doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check != 0) { + auto* thread_mem_ctx = doris::thread_context()->thread_mem_tracker_mgr.get(); + if (thread_mem_ctx->skip_memory_check != 0) { return; } @@ -65,9 +105,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem const std::string injection_err_msg = fmt::format( "[MemAllocInjectFault] Task {} alloc memory failed due to fault " "injection.", - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->label()); + thread_mem_ctx->limiter_mem_tracker()->label()); // Print stack trace for debug. [[maybe_unused]] auto stack_trace_st = doris::Status::Error<doris::ErrorCode::MEM_ALLOC_FAILED, true>( @@ -80,41 +118,19 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem } } } +} - if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) { - // Only thread attach task, and has not completely waited for thread_wait_gc_max_milliseconds, - // will wait for gc. otherwise, if the outside will catch the exception, throwing an exception. - std::string err_msg; - err_msg += fmt::format( - "Allocator sys memory check failed: Cannot alloc:{}, consuming " - "tracker:<{}>, peak used {}, current used {}, reserved {}, exec node:<{}>, {}.", - doris::PrettyPrinter::print_bytes(size), - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), - doris::PrettyPrinter::print_bytes( - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->peak_consumption()), - doris::PrettyPrinter::print_bytes( - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->consumption()), - doris::PrettyPrinter::print_bytes( - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->reserved_consumption()), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), - doris::GlobalMemoryArbitrator::process_mem_log_str()); - - if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 && - size > doris::config::stacktrace_in_alloc_large_memory_bytes) { - err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); - } - - if (doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check( + size_t size) const { + std::string err_msg; + if (sys_memory_exceed(size, &err_msg)) { + auto* thread_mem_ctx = doris::thread_context()->thread_mem_tracker_mgr.get(); + if (thread_mem_ctx->wait_gc()) { int64_t wait_milliseconds = 0; LOG(INFO) << fmt::format( "Task:{} waiting for enough memory in thread id:{}, maximum {}ms, {}.", - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), + thread_mem_ctx->limiter_mem_tracker()->label(), doris::ThreadContext::get_thread_id(), doris::config::thread_wait_gc_max_milliseconds, err_msg); @@ -144,7 +160,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem // else, enough memory is available, the task continues execute. if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. - doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); + thread_mem_ctx->disable_wait_gc(); doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage(); // If the outside will catch the exception, after throwing an exception, @@ -153,62 +169,61 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem LOG(INFO) << fmt::format( "Task:{} sys memory check failed, throw exception, after waiting for " "memory {}ms, {}.", - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->label(), - wait_milliseconds, err_msg); + thread_mem_ctx->limiter_mem_tracker()->label(), wait_milliseconds, + err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } else { LOG(INFO) << fmt::format( "Task:{} sys memory check failed, will continue to execute, cannot " "throw exception, after waiting for memory {}ms, {}.", - doris::thread_context() - ->thread_mem_tracker_mgr->limiter_mem_tracker() - ->label(), - wait_milliseconds, err_msg); + thread_mem_ctx->limiter_mem_tracker()->label(), wait_milliseconds, + err_msg); } } } else if (doris::enable_thread_catch_bad_alloc) { - LOG(INFO) << fmt::format("sys memory check failed, throw exception, {}.", err_msg); + LOG(INFO) << fmt::format("{}, throw exception.", err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } else { - LOG(INFO) << fmt::format("sys memory check failed, no throw exception, {}.", err_msg); + LOG(INFO) << fmt::format("{}, no throw exception.", err_msg); } } } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> -void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_check( - size_t size) const { +bool Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_exceed( + size_t size, std::string* err_msg) const { #ifdef BE_TEST - if (!doris::ExecEnv::ready()) { - return; + if (!doris::pthread_context_ptr_init) { + return false; } #endif - if (doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check != 0) { - return; + auto* thread_mem_ctx = doris::thread_context()->thread_mem_tracker_mgr.get(); + if (thread_mem_ctx->skip_memory_check != 0) { + return false; } - auto st = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit( - size); + + auto st = thread_mem_ctx->limiter_mem_tracker()->check_limit(size); if (!st) { - auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->print_log_usage( - err_msg); + *err_msg += fmt::format("Allocator mem tracker check failed, {}", st.to_string()); + thread_mem_ctx->limiter_mem_tracker()->print_log_usage(*err_msg); + return true; + } + return false; +} + +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_check( + size_t size) const { + std::string err_msg; + if (memory_tracker_exceed(size, &err_msg)) { doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); // If the outside will catch the exception, after throwing an exception, // the task will actively cancel itself. if (doris::enable_thread_catch_bad_alloc) { - LOG(INFO) << fmt::format( - "Task:{} memory tracker check failed, throw exception, {}.", - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), - err_msg); + LOG(INFO) << fmt::format("{}, throw exception.", err_msg); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } else { - LOG(INFO) << fmt::format( - "Task:{} memory tracker check failed, will continue to execute, no throw " - "exception, {}.", - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), - err_msg); + LOG(INFO) << fmt::format("{}, will continue to execute, no throw exception.", err_msg); } } } @@ -216,20 +231,27 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_ template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_check( size_t size) const { - sys_memory_check(size); - memory_tracker_check(size); + if (MemoryAllocator::need_check_and_tracking_memory()) { + alloc_fault_probability(); + sys_memory_check(size); + memory_tracker_check(size); + } } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory( size_t size) const { - CONSUME_THREAD_MEM_TRACKER(size); + if (MemoryAllocator::need_check_and_tracking_memory()) { + CONSUME_THREAD_MEM_TRACKER(size); + } } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory( size_t size) const { - RELEASE_THREAD_MEM_TRACKER(size); + if (MemoryAllocator::need_check_and_tracking_memory()) { + RELEASE_THREAD_MEM_TRACKER(size); + } } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> @@ -246,11 +268,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_b template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers( void* buf, size_t size) const { -#ifdef BE_TEST - if (!doris::ExecEnv::ready()) { - return; - } -#endif if (!doris::config::crash_in_memory_tracker_inaccurate) { return; } @@ -261,11 +278,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_add template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_address_sanitizers( void* buf, size_t size) const { -#ifdef BE_TEST - if (!doris::ExecEnv::ready()) { - return; - } -#endif if (!doris::config::crash_in_memory_tracker_inaccurate) { return; } @@ -277,13 +289,156 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_ template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size, size_t alignment) { - return alloc_impl(size, alignment); + memory_check(size); + // consume memory in tracker before alloc, similar to early declaration. + consume_memory(size); + void* buf; + size_t record_size = size; + + if (use_mmap && size >= doris::config::mmap_threshold) { + if (alignment > MMAP_MIN_ALIGNMENT) { + throw doris::Exception( + doris::ErrorCode::INVALID_ARGUMENT, + "Too large alignment {}: more than page size when allocating {}.", alignment, + size); + } + + buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); + if (MAP_FAILED == buf) { + release_memory(size); + throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size)); + } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } + + /// No need for zero-fill, because mmap guarantees it. + } else { + if (alignment <= MALLOC_MIN_ALIGNMENT) { + if constexpr (clear_memory) { + buf = MemoryAllocator::calloc(size, 1); + } else { + buf = MemoryAllocator::malloc(size); + } + + if (nullptr == buf) { + release_memory(size); + throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); + } + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } + add_address_sanitizers(buf, record_size); + } else { + buf = nullptr; + int res = MemoryAllocator::posix_memalign(&buf, alignment, size); + + if (0 != res) { + release_memory(size); + throw_bad_alloc(fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); + } + + if constexpr (clear_memory) { + memset(buf, 0, size); + } + + if constexpr (MemoryAllocator::need_record_actual_size()) { + record_size = MemoryAllocator::allocated_size(buf); + } + add_address_sanitizers(buf, record_size); + } + } + if constexpr (MemoryAllocator::need_record_actual_size()) { + consume_memory(record_size - size); + } + return buf; +} + +template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> +void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::free(void* buf, + size_t size) { + if (use_mmap && size >= doris::config::mmap_threshold) { + if (0 != munmap(buf, size)) { + throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); + } + } else { + remove_address_sanitizers(buf, size); + MemoryAllocator::free(buf); + } + release_memory(size); } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::realloc( void* buf, size_t old_size, size_t new_size, size_t alignment) { - return realloc_impl(buf, old_size, new_size, alignment); + if (old_size == new_size) { + /// nothing to do. + /// BTW, it's not possible to change alignment while doing realloc. + return buf; + } + memory_check(new_size); + // Realloc can do 2 possible things: + // - expand existing memory region + // - allocate new memory block and free the old one + // Because we don't know which option will be picked we need to make sure there is enough + // memory for all options. + consume_memory(new_size); + + if (!use_mmap || + (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && + alignment <= MALLOC_MIN_ALIGNMENT)) { + remove_address_sanitizers(buf, old_size); + /// Resize malloc'd memory region with no special alignment requirement. + void* new_buf = MemoryAllocator::realloc(buf, new_size); + if (nullptr == new_buf) { + release_memory(new_size); + throw_bad_alloc( + fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, new_size)); + } + // usually, buf addr = new_buf addr, asan maybe not equal. + add_address_sanitizers(new_buf, new_size); + + buf = new_buf; + release_memory(old_size); + if constexpr (clear_memory) { + if (new_size > old_size) { + memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size); + } + } + } else if (old_size >= doris::config::mmap_threshold && + new_size >= doris::config::mmap_threshold) { + /// Resize mmap'd memory region. + // On apple and freebsd self-implemented mremap used (common/mremap.h) + buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, + mmap_flags, -1, 0); + if (MAP_FAILED == buf) { + release_memory(new_size); + throw_bad_alloc(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", + old_size, new_size)); + } + release_memory(old_size); + + /// No need for zero-fill, because mmap guarantees it. + + if constexpr (mmap_populate) { + // MAP_POPULATE seems have no effect for mremap as for mmap, + // Clear enlarged memory range explicitly to pre-fault the pages + if (new_size > old_size) { + memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size); + } + } + } else { + // Big allocs that requires a copy. + void* new_buf = alloc(new_size, alignment); + memcpy(new_buf, buf, std::min(old_size, new_size)); + add_address_sanitizers(new_buf, new_size); + remove_address_sanitizers(buf, old_size); + free(buf, old_size); + buf = new_buf; + release_memory(old_size); + } + + return buf; } template class Allocator<true, true, true, DefaultMemoryAllocator>; @@ -295,6 +450,15 @@ template class Allocator<false, true, false, DefaultMemoryAllocator>; template class Allocator<false, false, true, DefaultMemoryAllocator>; template class Allocator<false, false, false, DefaultMemoryAllocator>; +template class Allocator<true, true, true, NoTrackingDefaultMemoryAllocator>; +template class Allocator<true, true, false, NoTrackingDefaultMemoryAllocator>; +template class Allocator<true, false, true, NoTrackingDefaultMemoryAllocator>; +template class Allocator<true, false, false, NoTrackingDefaultMemoryAllocator>; +template class Allocator<false, true, true, NoTrackingDefaultMemoryAllocator>; +template class Allocator<false, true, false, NoTrackingDefaultMemoryAllocator>; +template class Allocator<false, false, true, NoTrackingDefaultMemoryAllocator>; +template class Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>; + /** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. * But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here. */ diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 881e05c3117..ab3e699c617 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -112,6 +112,13 @@ public: nullptr, 0); #endif // defined(USE_JEMALLOC) } + + static constexpr bool need_check_and_tracking_memory() { return true; } +}; + +class NoTrackingDefaultMemoryAllocator : public DefaultMemoryAllocator { +public: + static constexpr bool need_check_and_tracking_memory() { return false; } }; /** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool. @@ -139,6 +146,8 @@ public: static void free(void* p) __THROW { std::free(p); } static void release_unused() {} + + static constexpr bool need_check_and_tracking_memory() { return true; } }; class RecordSizeMemoryAllocator { @@ -208,6 +217,8 @@ public: static void release_unused() {} + static constexpr bool need_check_and_tracking_memory() { return true; } + private: static std::unordered_map<void*, size_t> _allocated_sizes; static std::mutex _mutex; @@ -226,11 +237,41 @@ private: template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> class Allocator { public: + // Allocate memory range. + void* alloc(size_t size, size_t alignment = 0); + + /** Enlarge memory range. + * Data from old range is moved to the beginning of new range. + * Address of memory range could change. + */ + void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); + + // Free memory range. + void free(void* buf, size_t size); + + void release_unused() { MemoryAllocator::release_unused(); } + + bool sys_memory_exceed(size_t size, std::string* err_msg) const; + + bool memory_tracker_exceed(size_t size, std::string* err_msg) const; + + static constexpr bool need_check_and_tracking_memory() { + return MemoryAllocator::need_check_and_tracking_memory(); + } + +protected: + static constexpr size_t get_stack_threshold() { return 0; } + + static constexpr bool clear_memory = clear_memory_; + +private: void sys_memory_check(size_t size) const; void memory_tracker_check(size_t size) const; // If sys memory or tracker exceeds the limit, but there is no external catch bad_alloc, // alloc will continue to execute, so the consume memtracker is forced. void memory_check(size_t size) const; + void alloc_fault_probability() const; + // Increases consumption of this tracker by 'bytes'. // some special cases: // 1. objects that inherit Allocator will not be shared by multiple queries. @@ -245,157 +286,6 @@ public: void add_address_sanitizers(void* buf, size_t size) const; void remove_address_sanitizers(void* buf, size_t size) const; - void* alloc(size_t size, size_t alignment = 0); - void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); - - /// Allocate memory range. - void* alloc_impl(size_t size, size_t alignment = 0) { - memory_check(size); - // consume memory in tracker before alloc, similar to early declaration. - consume_memory(size); - void* buf; - size_t record_size = size; - - if (use_mmap && size >= doris::config::mmap_threshold) { - if (alignment > MMAP_MIN_ALIGNMENT) - throw doris::Exception( - doris::ErrorCode::INVALID_ARGUMENT, - "Too large alignment {}: more than page size when allocating {}.", - alignment, size); - - buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); - if (MAP_FAILED == buf) { - release_memory(size); - throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.", size)); - } - if constexpr (MemoryAllocator::need_record_actual_size()) { - record_size = MemoryAllocator::allocated_size(buf); - } - - /// No need for zero-fill, because mmap guarantees it. - } else { - if (alignment <= MALLOC_MIN_ALIGNMENT) { - if constexpr (clear_memory) - buf = MemoryAllocator::calloc(size, 1); - else - buf = MemoryAllocator::malloc(size); - - if (nullptr == buf) { - release_memory(size); - throw_bad_alloc(fmt::format("Allocator: Cannot malloc {}.", size)); - } - if constexpr (MemoryAllocator::need_record_actual_size()) { - record_size = MemoryAllocator::allocated_size(buf); - } - add_address_sanitizers(buf, record_size); - } else { - buf = nullptr; - int res = MemoryAllocator::posix_memalign(&buf, alignment, size); - - if (0 != res) { - release_memory(size); - throw_bad_alloc( - fmt::format("Cannot allocate memory (posix_memalign) {}.", size)); - } - - if constexpr (clear_memory) memset(buf, 0, size); - - if constexpr (MemoryAllocator::need_record_actual_size()) { - record_size = MemoryAllocator::allocated_size(buf); - } - add_address_sanitizers(buf, record_size); - } - } - if constexpr (MemoryAllocator::need_record_actual_size()) { - consume_memory(record_size - size); - } - return buf; - } - - /// Free memory range. - void free(void* buf, size_t size) { - if (use_mmap && size >= doris::config::mmap_threshold) { - if (0 != munmap(buf, size)) { - throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); - } - } else { - remove_address_sanitizers(buf, size); - MemoryAllocator::free(buf); - } - release_memory(size); - } - - void release_unused() { MemoryAllocator::release_unused(); } - - /** Enlarge memory range. - * Data from old range is moved to the beginning of new range. - * Address of memory range could change. - */ - void* realloc_impl(void* buf, size_t old_size, size_t new_size, size_t alignment = 0) { - if (old_size == new_size) { - /// nothing to do. - /// BTW, it's not possible to change alignment while doing realloc. - return buf; - } - memory_check(new_size); - consume_memory(new_size - old_size); - - if (!use_mmap || - (old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold && - alignment <= MALLOC_MIN_ALIGNMENT)) { - remove_address_sanitizers(buf, old_size); - /// Resize malloc'd memory region with no special alignment requirement. - void* new_buf = MemoryAllocator::realloc(buf, new_size); - if (nullptr == new_buf) { - release_memory(new_size - old_size); - throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size, - new_size)); - } - // usually, buf addr = new_buf addr, asan maybe not equal. - add_address_sanitizers(new_buf, new_size); - - buf = new_buf; - if constexpr (clear_memory) - if (new_size > old_size) - memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size); - } else if (old_size >= doris::config::mmap_threshold && - new_size >= doris::config::mmap_threshold) { - /// Resize mmap'd memory region. - // On apple and freebsd self-implemented mremap used (common/mremap.h) - buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, - mmap_flags, -1, 0); - if (MAP_FAILED == buf) { - release_memory(new_size - old_size); - throw_bad_alloc(fmt::format("Allocator: Cannot mremap memory chunk from {} to {}.", - old_size, new_size)); - } - - /// No need for zero-fill, because mmap guarantees it. - - if constexpr (mmap_populate) { - // MAP_POPULATE seems have no effect for mremap as for mmap, - // Clear enlarged memory range explicitly to pre-fault the pages - if (new_size > old_size) - memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size); - } - } else { - // Big allocs that requires a copy. - void* new_buf = alloc(new_size, alignment); - memcpy(new_buf, buf, std::min(old_size, new_size)); - add_address_sanitizers(new_buf, new_size); - remove_address_sanitizers(buf, old_size); - free(buf, old_size); - buf = new_buf; - } - - return buf; - } - -protected: - static constexpr size_t get_stack_threshold() { return 0; } - - static constexpr bool clear_memory = clear_memory_; - // Freshly mmapped pages are copy-on-write references to a global zero page. // On the first write, a page fault occurs, and an actual writable page is // allocated. If we are going to use this memory soon, such as when resizing @@ -452,6 +342,18 @@ public: return new_buf; } + bool sys_memory_exceed(size_t size, std::string* err_msg) const { + return Base::sys_memory_exceed(size, err_msg); + } + + bool memory_tracker_exceed(size_t size, std::string* err_msg) const { + return Base::memory_tracker_exceed(size, err_msg); + } + + static constexpr bool need_check_and_tracking_memory() { + return Base::need_check_and_tracking_memory(); + } + protected: static constexpr size_t get_stack_threshold() { return N; } }; diff --git a/be/src/vec/common/allocator_fwd.h b/be/src/vec/common/allocator_fwd.h index b20e88aa6fc..42f039b3b2a 100644 --- a/be/src/vec/common/allocator_fwd.h +++ b/be/src/vec/common/allocator_fwd.h @@ -26,6 +26,8 @@ #include <cstddef> namespace doris { class DefaultMemoryAllocator; +class NoTrackingDefaultMemoryAllocator; + template <bool clear_memory_, bool mmap_populate = false, bool use_mmap = false, typename MemoryAllocator = DefaultMemoryAllocator> class Allocator; diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index a8abce8385f..98823fffd42 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -33,6 +33,7 @@ #include <utility> #include "common/compiler_util.h" // IWYU pragma: keep +#include "runtime/thread_context.h" #include "vec/common/allocator.h" // IWYU pragma: keep #include "vec/common/memcpy_small.h" @@ -82,6 +83,16 @@ inline size_t round_up_to_power_of_two_or_zero(size_t n) { * and zero initialize -1th element. It allows to use -1th element that will have value 0. * This gives performance benefits when converting an array of offsets to array of sizes. * + * If reserve 4096 bytes, used 512 bytes, pad_left = 16, pad_right = 15, the structure of PODArray is as follows: + * + * 16 bytes 512 bytes 3553 bytes 15 bytes + * pad_left ----- c_start -------------c_end ---------------------------- c_end_of_storage ------------- pad_right + * ^ ^ ^ + * | | | + * | c_res_mem (usually > c_end && < c_end + PRE_GROWTH_SIZE) | + * | | + * +-------------------------------------- allocated_bytes (4096 bytes) -----------------------------------+ + * * Some methods using allocator have TAllocatorParams variadic arguments. * These arguments will be passed to corresponding methods of TAllocator. * Example: pointer to Arena, that is used for allocations. @@ -98,6 +109,7 @@ inline size_t round_up_to_power_of_two_or_zero(size_t n) { * TODO Allow greater alignment than alignof(T). Example: array of char aligned to page size. */ static constexpr size_t EmptyPODArraySize = 1024; +static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M extern const char empty_pod_array[EmptyPODArraySize]; /** Base class that depend only on size of element, not on element itself. @@ -122,6 +134,7 @@ protected: char* c_start = null; /// Does not include pad_left. char* c_end = null; char* c_end_of_storage = null; /// Does not include pad_right. + char* c_res_mem = null; /// The amount of memory occupied by the num_elements of the elements. static size_t byte_size(size_t num_elements) { @@ -143,6 +156,42 @@ protected: return byte_size(num_elements) + pad_right + pad_left; } + inline void check_memory(int64_t size) { + std::string err_msg; + if (TAllocator::sys_memory_exceed(size, &err_msg) || + TAllocator::memory_tracker_exceed(size, &err_msg)) { + err_msg = fmt::format("PODArray reserve memory failed, {}.", err_msg); + if (doris::enable_thread_catch_bad_alloc) { + LOG(WARNING) << err_msg; + throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); + } else { + LOG_EVERY_N(WARNING, 1024) << err_msg; + } + } + } + + inline void reset_resident_memory(const char* c_end_new) { + static_assert(!TAllocator::need_check_and_tracking_memory(), + "TAllocator should specify `NoTrackingDefaultMemoryAllocator`"); + if (UNLIKELY(c_end_new - c_res_mem > 0)) { + // - allocated_bytes = c_end_of_storage - c_start = 4 MB; + // - used_bytes = c_end_new - c_start = 2.1 MB; + // - last tracking_res_memory = c_res_mem - c_start = 1 MB; + // - res_mem_growth = min(allocated_bytes, integerRoundUp(used_bytes)) - last_tracking_res_memory = 3 - 1 = 2 MB; + // - update tracking_res_memory = 1 + 2 = 3 MB; + // so after each reset_resident_memory, tracking_res_memory >= used_bytes; + int64_t res_mem_growth = + std::min(static_cast<size_t>(c_end_of_storage - c_start), + integerRoundUp(c_end_new - c_start, PRE_GROWTH_SIZE)) - + (c_res_mem - c_start); + check_memory(res_mem_growth); + CONSUME_THREAD_MEM_TRACKER(res_mem_growth); + c_res_mem = c_res_mem + res_mem_growth; + } + } + + inline void reset_resident_memory() { reset_resident_memory(c_end); } + void alloc_for_num_elements(size_t num_elements) { alloc(round_up_to_power_of_two_or_zero(minimum_memory_for_elements(num_elements))); } @@ -154,6 +203,7 @@ protected: c_start = allocated + pad_left; c_end = c_start; + c_res_mem = c_start; c_end_of_storage = allocated + bytes - pad_right; if (pad_left) memset(c_start - ELEMENT_SIZE, 0, ELEMENT_SIZE); @@ -161,9 +211,8 @@ protected: void dealloc() { if (c_start == null) return; - unprotect(); - + RELEASE_THREAD_MEM_TRACKER((c_res_mem - c_start)); TAllocator::free(c_start - pad_left, allocated_bytes()); } @@ -177,13 +226,21 @@ protected: unprotect(); ptrdiff_t end_diff = c_end - c_start; - + ptrdiff_t res_mem_diff = c_res_mem - c_start; + + // Realloc can do 2 possible things: + // - expand existing memory region + // - allocate new memory block and free the old one + // Because we don't know which option will be picked we need to make sure there is enough + // memory for all options. + check_memory(res_mem_diff); char* allocated = reinterpret_cast<char*>( TAllocator::realloc(c_start - pad_left, allocated_bytes(), bytes, std::forward<TAllocatorParams>(allocator_params)...)); c_start = allocated + pad_left; c_end = c_start + end_diff; + c_res_mem = c_start + res_mem_diff; c_end_of_storage = allocated + bytes - pad_right; } @@ -259,7 +316,10 @@ public: resize_assume_reserved(n); } - void resize_assume_reserved(const size_t n) { c_end = c_start + byte_size(n); } + void resize_assume_reserved(const size_t n) { + c_end = c_start + byte_size(n); + reset_resident_memory(); + } const char* raw_data() const { return c_start; } @@ -268,6 +328,7 @@ public: if (UNLIKELY(c_end == c_end_of_storage)) reserve_for_next_size(std::forward<TAllocatorParams>(allocator_params)...); + reset_resident_memory(c_end + byte_size(1)); memcpy(c_end, ptr, ELEMENT_SIZE); c_end += byte_size(1); } @@ -328,6 +389,7 @@ public: PODArray(size_t n) { this->alloc_for_num_elements(n); this->c_end += this->byte_size(n); + this->reset_resident_memory(); } PODArray(size_t n, const T& x) { @@ -379,25 +441,36 @@ public: const_iterator cend() const { return t_end(); } void* get_end_ptr() const { return this->c_end; } - void set_end_ptr(void* ptr) { this->c_end = (char*)ptr; } + void set_end_ptr(void* ptr) { + this->c_end = (char*)ptr; + this->reset_resident_memory(); + } /// Same as resize, but zeroes new elements. void resize_fill(size_t n) { size_t old_size = this->size(); + const auto new_size = this->byte_size(n); if (n > old_size) { this->reserve(n); + this->reset_resident_memory(this->c_start + new_size); memset(this->c_end, 0, this->byte_size(n - old_size)); + } else { + this->reset_resident_memory(this->c_start + new_size); } - this->c_end = this->c_start + this->byte_size(n); + this->c_end = this->c_start + new_size; } void resize_fill(size_t n, const T& value) { size_t old_size = this->size(); + const auto new_size = this->byte_size(n); if (n > old_size) { this->reserve(n); + this->reset_resident_memory(this->c_start + new_size); std::fill(t_end(), t_end() + n - old_size, value); + } else { + this->reset_resident_memory(this->c_start + new_size); } - this->c_end = this->c_start + this->byte_size(n); + this->c_end = this->c_start + new_size; } template <typename U, typename... TAllocatorParams> @@ -406,6 +479,7 @@ public: this->reserve_for_next_size(std::forward<TAllocatorParams>(allocator_params)...); } + this->reset_resident_memory(this->c_end + this->byte_size(1)); new (t_end()) T(std::forward<U>(x)); this->c_end += this->byte_size(1); } @@ -417,6 +491,7 @@ public: if (UNLIKELY(this->c_end + growth_size > this->c_end_of_storage)) { this->reserve(this->size() + num); } + this->reset_resident_memory(this->c_end + growth_size); std::fill(t_end(), t_end() + num, x); this->c_end = this->c_end + growth_size; } @@ -425,8 +500,10 @@ public: template <typename U, typename... TAllocatorParams> void add_num_element_without_reserve(U&& x, uint32_t num, TAllocatorParams&&... allocator_params) { + const auto growth_size = sizeof(T) * num; + this->reset_resident_memory(this->c_end + growth_size); std::fill(t_end(), t_end() + num, x); - this->c_end += sizeof(T) * num; + this->c_end += growth_size; } /** @@ -435,6 +512,7 @@ public: */ template <typename U, typename... TAllocatorParams> void push_back_without_reserve(U&& x, TAllocatorParams&&... allocator_params) { + this->reset_resident_memory(this->c_end + this->byte_size(1)); new (t_end()) T(std::forward<U>(x)); this->c_end += this->byte_size(1); } @@ -448,6 +526,7 @@ public: this->reserve_for_next_size(); } + this->reset_resident_memory(this->c_end + this->byte_size(1)); new (t_end()) T(std::forward<Args>(args)...); this->c_end += this->byte_size(1); } @@ -479,6 +558,7 @@ public: static_assert(pad_right_ >= 15); insert_prepare(from_begin, from_end, std::forward<TAllocatorParams>(allocator_params)...); size_t bytes_to_copy = this->byte_size(from_end - from_begin); + this->reset_resident_memory(this->c_end + bytes_to_copy); memcpy_small_allow_read_write_overflow15( this->c_end, reinterpret_cast<const void*>(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; @@ -492,6 +572,7 @@ public: } size_t bytes_to_move = this->byte_size(end() - it); insert_prepare(from_begin, from_end); + this->reset_resident_memory(this->c_end + bytes_to_copy); if (UNLIKELY(bytes_to_move)) { memmove(this->c_end + bytes_to_copy - bytes_to_move, this->c_end - bytes_to_move, @@ -507,6 +588,7 @@ public: void insert_assume_reserved(It1 from_begin, It2 from_end) { this->assert_not_intersects(from_begin, from_end); size_t bytes_to_copy = this->byte_size(from_end - from_begin); + this->reset_resident_memory(this->c_end + bytes_to_copy); memcpy(this->c_end, reinterpret_cast<const void*>(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; } @@ -514,6 +596,7 @@ public: template <typename It1, typename It2> void insert_assume_reserved_and_allow_overflow(It1 from_begin, It2 from_end) { size_t bytes_to_copy = this->byte_size(from_end - from_begin); + this->reset_resident_memory(this->c_end + bytes_to_copy); memcpy_small_allow_read_write_overflow15( this->c_end, reinterpret_cast<const void*>(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; @@ -534,9 +617,11 @@ public: auto swap_stack_heap = [this](PODArray& arr1, PODArray& arr2) { size_t stack_size = arr1.size(); size_t stack_allocated = arr1.allocated_bytes(); + size_t stack_res_mem_used = arr1.c_res_mem - arr1.c_start; size_t heap_size = arr2.size(); size_t heap_allocated = arr2.allocated_bytes(); + size_t heap_res_mem_used = arr2.c_res_mem - arr2.c_start; /// Keep track of the stack content we have to copy. char* stack_c_start = arr1.c_start; @@ -545,12 +630,14 @@ public: arr1.c_start = arr2.c_start; arr1.c_end_of_storage = arr1.c_start + heap_allocated - arr2.pad_right - arr2.pad_left; arr1.c_end = arr1.c_start + this->byte_size(heap_size); + arr1.c_res_mem = arr1.c_start + heap_res_mem_used; /// Allocate stack space for arr2. arr2.alloc(stack_allocated); /// Copy the stack content. memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size)); arr2.c_end = arr2.c_start + this->byte_size(stack_size); + arr2.c_res_mem = arr2.c_start + stack_res_mem_used; }; auto do_move = [this](PODArray& src, PODArray& dest) { @@ -559,14 +646,17 @@ public: dest.alloc(src.allocated_bytes()); memcpy(dest.c_start, src.c_start, this->byte_size(src.size())); dest.c_end = dest.c_start + this->byte_size(src.size()); + dest.c_res_mem = dest.c_start + (src.c_res_mem - src.c_start); src.c_start = Base::null; src.c_end = Base::null; src.c_end_of_storage = Base::null; + src.c_res_mem = Base::null; } else { std::swap(dest.c_start, src.c_start); std::swap(dest.c_end, src.c_end); std::swap(dest.c_end_of_storage, src.c_end_of_storage); + std::swap(dest.c_res_mem, src.c_res_mem); } }; @@ -594,9 +684,11 @@ public: size_t lhs_size = this->size(); size_t lhs_allocated = this->allocated_bytes(); + size_t lhs_res_mem_used = this->c_res_mem - this->c_start; size_t rhs_size = rhs.size(); size_t rhs_allocated = rhs.allocated_bytes(); + size_t rhs_res_mem_used = rhs.c_res_mem - rhs.c_start; this->c_end_of_storage = this->c_start + rhs_allocated - Base::pad_right - Base::pad_left; @@ -604,6 +696,9 @@ public: this->c_end = this->c_start + this->byte_size(rhs_size); rhs.c_end = rhs.c_start + this->byte_size(lhs_size); + + this->c_res_mem = this->c_start + rhs_res_mem_used; + rhs.c_res_mem = rhs.c_start + lhs_res_mem_used; } else if (this->is_allocated_from_stack() && !rhs.is_allocated_from_stack()) { swap_stack_heap(*this, rhs); } else if (!this->is_allocated_from_stack() && rhs.is_allocated_from_stack()) { @@ -612,6 +707,7 @@ public: std::swap(this->c_start, rhs.c_start); std::swap(this->c_end, rhs.c_end); std::swap(this->c_end_of_storage, rhs.c_end_of_storage); + std::swap(this->c_res_mem, rhs.c_res_mem); } } @@ -628,6 +724,7 @@ public: this->reserve(round_up_to_power_of_two_or_zero(required_capacity)); size_t bytes_to_copy = this->byte_size(required_capacity); + this->reset_resident_memory(this->c_start + bytes_to_copy); memcpy(this->c_start, reinterpret_cast<const void*>(&*from_begin), bytes_to_copy); this->c_end = this->c_start + bytes_to_copy; } diff --git a/be/src/vec/common/pod_array_fwd.h b/be/src/vec/common/pod_array_fwd.h index bd0c7e272e4..cd1ce16ab9e 100644 --- a/be/src/vec/common/pod_array_fwd.h +++ b/be/src/vec/common/pod_array_fwd.h @@ -32,7 +32,8 @@ inline constexpr size_t integerRoundUp(size_t value, size_t dividend) { return ((value + dividend - 1) / dividend) * dividend; } -template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>, +template <typename T, size_t initial_bytes = 4096, + typename TAllocator = Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>, size_t pad_right_ = 0, size_t pad_left_ = 0> class PODArray; @@ -40,7 +41,8 @@ class PODArray; * TODO, Adapt internal data structures to 512-bit era https://github.com/ClickHouse/ClickHouse/pull/42564 * Padding in internal data structures increased to 64 bytes., support AVX-512 simd. */ -template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>> +template <typename T, size_t initial_bytes = 4096, + typename TAllocator = Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>> using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 16, 15>; /** A helper for declaring PODArray that uses inline memory. @@ -49,8 +51,9 @@ using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 16, 15>; */ template <typename T, size_t inline_bytes, size_t rounded_bytes = integerRoundUp(inline_bytes, sizeof(T))> -using PODArrayWithStackMemory = - PODArray<T, rounded_bytes, - AllocatorWithStackMemory<Allocator<false>, rounded_bytes, alignof(T)>>; +using PODArrayWithStackMemory = PODArray< + T, rounded_bytes, + AllocatorWithStackMemory<Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>, + rounded_bytes, alignof(T)>>; } // namespace doris::vectorized diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index 84d0d617a96..3a69418971a 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -341,7 +341,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { int64_t size3 = size2 * 2; thread_context->attach_task(rc); - auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3, false); + auto st = thread_context->thread_mem_tracker_mgr->try_reserve(size3); EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); diff --git a/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h b/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h index 14c68c5be9c..ae7f856441c 100644 --- a/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h +++ b/be/test/testutil/mock/mock_thread_mem_tracker_mgr.h @@ -23,7 +23,10 @@ namespace doris { class MockThreadMemTrackerMgr : public ThreadMemTrackerMgr { public: MockThreadMemTrackerMgr() : ThreadMemTrackerMgr() {} - Status try_reserve(int64_t size, bool only_check_process_memory = false) override { + Status try_reserve( + int64_t size, + TryReserveChecker checker = + TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS) override { return _test_low_memory ? Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>("") : Status::OK(); } diff --git a/be/test/vec/common/pod_array_test.cpp b/be/test/vec/common/pod_array_test.cpp index db022a12729..8139b1e9937 100644 --- a/be/test/vec/common/pod_array_test.cpp +++ b/be/test/vec/common/pod_array_test.cpp @@ -28,8 +28,11 @@ namespace doris { TEST(PODArrayTest, PODArrayBasicMove) { static constexpr size_t initial_bytes = 32; - using Array = vectorized::PODArray<uint64_t, initial_bytes, - AllocatorWithStackMemory<Allocator<false>, initial_bytes>>; + using Array = vectorized::PODArray< + uint64_t, initial_bytes, + AllocatorWithStackMemory< + Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>, + initial_bytes>>; { Array arr; @@ -141,8 +144,11 @@ TEST(PODArrayTest, PODArrayBasicMove) { TEST(PODArrayTest, PODArrayBasicSwap) { static constexpr size_t initial_bytes = 32; - using Array = vectorized::PODArray<uint64_t, initial_bytes, - AllocatorWithStackMemory<Allocator<false>, initial_bytes>>; + using Array = vectorized::PODArray< + uint64_t, initial_bytes, + AllocatorWithStackMemory< + Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>, + initial_bytes>>; { Array arr; @@ -383,8 +389,11 @@ TEST(PODArrayTest, PODArrayBasicSwap) { TEST(PODArrayTest, PODArrayBasicSwapMoveConstructor) { static constexpr size_t initial_bytes = 32; - using Array = vectorized::PODArray<uint64_t, initial_bytes, - AllocatorWithStackMemory<Allocator<false>, initial_bytes>>; + using Array = vectorized::PODArray< + uint64_t, initial_bytes, + AllocatorWithStackMemory< + Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>, + initial_bytes>>; { Array arr; @@ -652,4 +661,274 @@ TEST(PODArrayTest, PODErase) { } } +TEST(PODArrayTest, PaddedPODArrayTrackingMemory) { + auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "UT"); + // PaddedPODArray + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t); + EXPECT_EQ(t->consumption(), 0); + static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M + + vectorized::PaddedPODArray<uint64_t, 4096> array; + size_t pad_right = vectorized::integerRoundUp(16, sizeof(uint64_t)); + size_t pad_left = + vectorized::integerRoundUp(vectorized::integerRoundUp(15, sizeof(uint64_t)), 16); + EXPECT_EQ(t->consumption(), 0); + + array.push_back(1); + EXPECT_EQ(array.size(), 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.push_back(2); + EXPECT_EQ(array.size(), 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t)); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t)); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE); + + array.push_back(3); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) + 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + // c_end_of_storage - c_start < integerRoundUp(c_end_new - c_start, PRE_GROWTH_SIZE) + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.assign({1, 2, 3}); + EXPECT_EQ(array.size(), 3); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.resize((PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3); + + array.add_num_element_without_reserve(1, 10); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 3 + 10); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.add_num_element(1, (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 2); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 5 + 10); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6); + + array.push_back_without_reserve(11); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 5 + 11); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 6, 2); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 6); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 6); + + array.push_back_without_reserve(22); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7); + + array.emplace_back(33); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7); + + array.pop_back(); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 6 + 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7); + + for (int i = 0; i < (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 2; i++) { + array.push_back(2); + array.pop_back(); + array.pop_back(); + } + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 4 + 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 7, 3); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 7); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 7); + + { + vectorized::PaddedPODArray<uint64_t, 32> array2; + array2.push_back(3); + array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 3); + array.insert(array2.begin(), array2.end()); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 9); + } + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 8); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 8); + + { + vectorized::PaddedPODArray<uint64_t, 32> array2; + array2.resize_fill((PRE_GROWTH_SIZE / sizeof(uint64_t)) * 7, 3); + array.insert_small_allow_read_write_overflow15(array2.begin(), array2.end()); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 22); + } + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 15); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 15); + + { + vectorized::PaddedPODArray<uint64_t, 32> array2; + array2.push_back(3); + array.insert(array2.begin(), array2.end()); + } + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 15 + 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes() - pad_left - pad_right); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 16, 4); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 16); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 16); + + { + vectorized::PaddedPODArray<uint64_t, 32> array2; + array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 3); + array.insert(array.begin(), array2.begin(), array2.end()); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 18); + } + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 17); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 17); + + { + vectorized::PaddedPODArray<uint64_t, 32> array2; + array2.resize_fill((PRE_GROWTH_SIZE / sizeof(uint64_t) * 2), 3); + array.insert_assume_reserved(array2.begin(), array2.end()); + array.insert_assume_reserved_and_allow_overflow(array2.begin(), array2.end()); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 23); + } + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 21); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21); + + size_t n = 100; + array.assign(n, (uint64_t)0); + EXPECT_EQ(array.size(), 100); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21); + + array.add_num_element_without_reserve(1, PRE_GROWTH_SIZE / sizeof(uint64_t)); + EXPECT_EQ(array.size(), (PRE_GROWTH_SIZE / sizeof(uint64_t)) * 1 + 100); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21); + + array.erase(array.begin() + 100, array.end()); + EXPECT_EQ(array.size(), 100); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 21); + } + EXPECT_EQ(t->consumption(), 0); +} + +TEST(PODArrayTest, PODArrayTrackingMemory) { + auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "UT"); + + // PODArray with AllocatorWithStackMemory + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t); + EXPECT_EQ(t->consumption(), 0); + static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M + + static constexpr size_t initial_bytes = 32; + using Array = vectorized::PODArray< + uint64_t, initial_bytes, + AllocatorWithStackMemory< + Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>>>; + Array array; + + array.push_back(1); + EXPECT_EQ(array.size(), 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes()); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 1); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t)); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE); + + // test swap + size_t new_capacity = 0; + size_t new_size = 0; + { + Array array2; + array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 2, 1); + EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3); + new_capacity = array2.capacity(); + new_size = array2.size(); + + array.swap(array2); + EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t)); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3); + EXPECT_EQ(array.capacity(), new_capacity); + EXPECT_EQ(array.size(), new_size); + } + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 2); + } + EXPECT_EQ(t->consumption(), 0); + + // PODArray with Allocator + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(t); + EXPECT_EQ(t->consumption(), 0); + static constexpr size_t PRE_GROWTH_SIZE = (1ULL << 20); // 1M + + static constexpr size_t initial_bytes = 32; + using Array = vectorized::PODArray< + uint64_t, initial_bytes, + Allocator<false, false, false, NoTrackingDefaultMemoryAllocator>>; + Array array; + + array.push_back(1); + EXPECT_EQ(array.size(), 1); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), array.allocated_bytes()); + + array.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t), 1); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t)); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE); + + // test swap + size_t new_capacity = 0; + size_t new_size = 0; + { + Array array2; + array2.resize_fill(PRE_GROWTH_SIZE / sizeof(uint64_t) * 2, 1); + EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3); + new_capacity = array2.capacity(); + new_size = array2.size(); + + array.swap(array2); + EXPECT_EQ(array2.size(), PRE_GROWTH_SIZE / sizeof(uint64_t)); + EXPECT_EQ(array.size(), PRE_GROWTH_SIZE / sizeof(uint64_t) * 2); + doris::thread_context()->thread_mem_tracker_mgr->flush_untracked_mem(); + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 3); + EXPECT_EQ(array.capacity(), new_capacity); + EXPECT_EQ(array.size(), new_size); + } + EXPECT_EQ(t->consumption(), PRE_GROWTH_SIZE * 2); + } + EXPECT_EQ(t->consumption(), 0); +} + } // end namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org