This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 871b0d87b06 [fix](memory) Fix ThreadMemTrackerMgr consumer tracker remove reserve memory (#40380) (#40452) 871b0d87b06 is described below commit 871b0d87b06c3964b1a21f1134d60ff619294ac5 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Sep 5 22:04:27 2024 +0800 [fix](memory) Fix ThreadMemTrackerMgr consumer tracker remove reserve memory (#40380) (#40452) … `count_scope_mem` and `consumer_tracker` not support reserve memory and not require use `_untracked_mem` to batch consume, because `count_scope_mem` is thread local, `consumer_tracker` will not be bound by many threads, so there is no performance problem. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: Xinyi Zou <zouxiny...@gmail.com> --- be/src/runtime/exec_env.h | 2 - be/src/runtime/exec_env_init.cpp | 1 - be/src/runtime/memory/mem_tracker_limiter.cpp | 2 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 2 - be/src/runtime/memory/thread_mem_tracker_mgr.h | 82 +++++---------------- be/src/runtime/thread_context.h | 86 +++++++++++----------- be/src/vec/common/allocator.cpp | 2 +- .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 66 +++++------------ 8 files changed, 87 insertions(+), 156 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f751aeb5d82..98d82f274e7 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -173,7 +173,6 @@ public: std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool; void init_mem_tracker(); std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; } - MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } MemTrackerLimiter* details_mem_tracker_set() { return _details_mem_tracker_set.get(); } std::shared_ptr<MemTracker> page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } @@ -355,7 +354,6 @@ private: // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker; - MemTrackerLimiter* _orphan_mem_tracker_raw = nullptr; std::shared_ptr<MemTrackerLimiter> _details_mem_tracker_set; // page size not in cache, data page/index page/etc. std::shared_ptr<MemTracker> _page_no_cache_mem_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 674d5ee5115..834e402ca1e 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -587,7 +587,6 @@ void ExecEnv::init_mem_tracker() { _s_tracking_memory = true; _orphan_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); - _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); _details_mem_tracker_set = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "DetailsTrackerSet"); _page_no_cache_mem_tracker = diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index cc695a6fdd5..2d1466025a6 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -513,7 +513,7 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { err_msg += fmt::format( " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see " "be.INFO.", - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()); + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label()); } else if (_type == Type::SCHEMA_CHANGE) { err_msg += fmt::format( " can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to " diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index daa49548819..33dd0d41822 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -55,7 +55,6 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( _untracked_mem = 0; } _limiter_tracker = mem_tracker; - _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker( @@ -67,7 +66,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _reserved_mem = _reserved_mem_stack.back(); _reserved_mem_stack.pop_back(); _limiter_tracker = old_mem_tracker; - _limiter_tracker_raw = old_mem_tracker.get(); } void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 3762326cd07..bb0091f2e6d 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -64,7 +64,7 @@ public: // Must be fast enough! Thread update_tracker may be called very frequently. bool push_consumer_tracker(MemTracker* mem_tracker); void pop_consumer_tracker(); - std::string last_consumer_tracker() { + std::string last_consumer_tracker_label() { return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label(); } @@ -76,18 +76,6 @@ public: void reset_wg_wptr() { _wg_wptr.reset(); } - void start_count_scope_mem() { - CHECK(init()); - _scope_mem = _reserved_mem; // consume in advance - _count_scope_mem = true; - } - - int64_t stop_count_scope_mem() { - flush_untracked_mem(); - _count_scope_mem = false; - return _scope_mem - _reserved_mem; - } - // Note that, If call the memory allocation operation in Memory Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, @@ -109,10 +97,6 @@ public: CHECK(init()); return _limiter_tracker; } - MemTrackerLimiter* limiter_mem_tracker_raw() { - CHECK(init()); - return _limiter_tracker_raw; - } void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } @@ -127,7 +111,7 @@ public: return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _limiter_tracker_raw->log_usage(), + std::to_string(_untracked_mem), _limiter_tracker->log_usage(), fmt::to_string(consumer_tracker_buf)); } @@ -146,16 +130,12 @@ private: // so `attach_limiter_tracker` may be nested. std::vector<int64_t> _reserved_mem_stack; - bool _count_scope_mem = false; - int64_t _scope_mem = 0; - std::string _failed_consume_msg = std::string(); // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. // A thread of query/load will only wait once during execution. bool _wait_gc = false; std::shared_ptr<MemTrackerLimiter> _limiter_tracker; - MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector<MemTracker*> _consumer_tracker_stack; std::weak_ptr<WorkloadGroup> _wg_wptr; @@ -171,7 +151,6 @@ inline bool ThreadMemTrackerMgr::init() { if (_init) return true; if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); _wait_gc = true; _init = true; return true; @@ -185,20 +164,21 @@ inline bool ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { return false; } _consumer_tracker_stack.push_back(tracker); - tracker->release(_untracked_mem); - tracker->consume(_reserved_mem); // consume in advance return true; } inline void ThreadMemTrackerMgr::pop_consumer_tracker() { DCHECK(!_consumer_tracker_stack.empty()); - flush_untracked_mem(); - _consumer_tracker_stack.back()->consume(_untracked_mem); - _consumer_tracker_stack.back()->release(_reserved_mem); _consumer_tracker_stack.pop_back(); } inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) { + // `consumer_tracker` not support reserve memory and not require use `_untracked_mem` to batch consume, + // because `consumer_tracker` will not be bound by many threads, so there is no performance problem. + for (auto* tracker : _consumer_tracker_stack) { + tracker->consume(size); + } + if (_reserved_mem != 0) { if (_reserved_mem > size) { // only need to subtract _reserved_mem, no need to consume MemTracker, @@ -282,62 +262,46 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { return; } _stop_consume = true; - DCHECK(_limiter_tracker_raw); + DCHECK(_limiter_tracker); _old_untracked_mem = _untracked_mem; - if (_count_scope_mem) { - _scope_mem += _untracked_mem; - } - _limiter_tracker_raw->consume(_old_untracked_mem); - for (auto* tracker : _consumer_tracker_stack) { - tracker->consume(_old_untracked_mem); - } + _limiter_tracker->consume(_old_untracked_mem); _untracked_mem -= _old_untracked_mem; _stop_consume = false; } inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { - DCHECK(_limiter_tracker_raw); + DCHECK(_limiter_tracker); DCHECK(size >= 0); CHECK(init()); // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); - auto wg_ptr = _wg_wptr.lock(); - if (!_limiter_tracker_raw->try_consume(size)) { + if (!_limiter_tracker->try_consume(size)) { auto err_msg = fmt::format( "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " - "{}; {}; {}", - size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit(), - wg_ptr->memory_debug_string(), GlobalMemoryArbitrator::process_mem_log_str()); + "{}", + size, _limiter_tracker->consumption(), _limiter_tracker->limit()); return doris::Status::MemoryLimitExceeded(err_msg); } + auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { - auto err_msg = fmt::format( - "reserve memory failed, size: {}, because {}; memory tracker consumption: {}, " - "limit: {}; {}", - size, wg_ptr->memory_debug_string(), _limiter_tracker_raw->consumption(), - _limiter_tracker_raw->limit(), GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker_raw->release(size); // rollback + auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, + wg_ptr->memory_debug_string()); + _limiter_tracker->release(size); // rollback return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, GlobalMemoryArbitrator::process_mem_log_str()); - _limiter_tracker_raw->release(size); // rollback + _limiter_tracker->release(size); // rollback if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback } return doris::Status::MemoryLimitExceeded(err_msg); } - if (_count_scope_mem) { - _scope_mem += size; - } - for (auto* tracker : _consumer_tracker_stack) { - tracker->consume(size); - } _reserved_mem += size; return doris::Status::OK(); } @@ -346,17 +310,11 @@ inline void ThreadMemTrackerMgr::release_reserved() { if (_reserved_mem != 0) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); - _limiter_tracker_raw->release(_reserved_mem); + _limiter_tracker->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem); } - if (_count_scope_mem) { - _scope_mem -= _reserved_mem; - } - for (auto* tracker : _consumer_tracker_stack) { - tracker->release(_reserved_mem); - } _untracked_mem = 0; _reserved_mem = 0; } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 6158f0535be..0ee47a97962 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -82,10 +82,16 @@ #endif #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) -// Count a code segment memory (memory malloc - memory free) to int64_t -// Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT_BY_HOOK(&scope_mem); xxx; xxx; } -#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ - auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCountByHook(scope_mem) +// Count a code segment memory +// Usage example: +// int64_t peak_mem = 0; +// { +// SCOPED_PEAK_MEM(&peak_mem); +// xxxx +// } +// LOG(INFO) << *peak_mem; +#define SCOPED_PEAK_MEM(peak_mem) \ + auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem) // Count a code segment memory (memory malloc - memory free) to MemTracker. // Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. @@ -94,8 +100,7 @@ #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker) #else -#define SCOPED_MEM_COUNT_BY_HOOK(scope_mem) \ - auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext() +#define SCOPED_PEAK_MEM() auto VARNAME_LINENUM(scoped_tls_pm) = doris::ScopedInitThreadContext() #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \ auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext() #endif @@ -229,7 +234,7 @@ public: // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { - return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + return thread_mem_tracker_mgr->limiter_mem_tracker().get(); } QueryThreadContext query_thread_context(); @@ -402,23 +407,22 @@ public: std::weak_ptr<WorkloadGroup> wg_wptr; }; -class ScopeMemCountByHook { +class ScopedPeakMem { public: - explicit ScopeMemCountByHook(int64_t* scope_mem) { + explicit ScopedPeakMem(int64* peak_mem) : _peak_mem(peak_mem), _mem_tracker("ScopedPeakMem") { ThreadLocalHandle::create_thread_local_if_not_exits(); - _scope_mem = scope_mem; - thread_context()->thread_mem_tracker_mgr->start_count_scope_mem(); - use_mem_hook = true; + thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker); } - ~ScopeMemCountByHook() { - use_mem_hook = false; - *_scope_mem += thread_context()->thread_mem_tracker_mgr->stop_count_scope_mem(); + ~ScopedPeakMem() { + thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); + *_peak_mem += _mem_tracker.peak_consumption(); ThreadLocalHandle::del_thread_local_if_count_is_zero(); } private: - int64_t* _scope_mem = nullptr; + int64* _peak_mem; + MemTracker _mem_tracker; }; // only hold thread context in scope. @@ -516,18 +520,18 @@ public: // Basic macros for mem tracker, usually do not need to be modified and used. #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) // used to fix the tracking accuracy of caches. -#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ - do { \ - ORPHAN_TRACKER_CHECK(); \ - doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ - size, tracker); \ +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ + do { \ + ORPHAN_TRACKER_CHECK(); \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \ + size, tracker); \ } while (0) -#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ - do { \ - ORPHAN_TRACKER_CHECK(); \ - tracker->transfer_to( \ - size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw()); \ +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ + do { \ + ORPHAN_TRACKER_CHECK(); \ + tracker->transfer_to( \ + size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \ } while (0) // Mem Hook to consume thread mem tracker @@ -553,21 +557,21 @@ public: // if use mem hook, avoid repeated consume. // must call create_thread_local_if_not_exits() before use thread_context(). -#define CONSUME_THREAD_MEM_TRACKER(size) \ - do { \ - if (size == 0 || doris::use_mem_hook) { \ - break; \ - } \ - if (doris::pthread_context_ptr_init) { \ - DCHECK(bthread_self() == 0); \ - doris::thread_context_ptr->consume_memory(size); \ - } else if (bthread_self() != 0) { \ - static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \ - ->consume_memory(size); \ - } else if (doris::ExecEnv::ready()) { \ - MEMORY_ORPHAN_CHECK(); \ - doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ - } \ +#define CONSUME_THREAD_MEM_TRACKER(size) \ + do { \ + if (size == 0 || doris::use_mem_hook) { \ + break; \ + } \ + if (doris::pthread_context_ptr_init) { \ + DCHECK(bthread_self() == 0); \ + doris::thread_context_ptr->consume_memory(size); \ + } else if (bthread_self() != 0) { \ + static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \ + ->consume_memory(size); \ + } else if (doris::ExecEnv::ready()) { \ + MEMORY_ORPHAN_CHECK(); \ + doris::ExecEnv::GetInstance()->orphan_mem_tracker()->consume_no_update_peak(size); \ + } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER(size) CONSUME_THREAD_MEM_TRACKER(-size) diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index ae5f27989b2..c7dda2b4c19 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -90,7 +90,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker()->peak_consumption(), doris::thread_context()->thread_mem_tracker()->consumption(), - doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), + doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 && diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index c1feb43fe91..d4624273b0b 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -181,8 +181,8 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { bool rt = thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get()); EXPECT_EQ(rt, true); - EXPECT_EQ(t1->consumption(), size1 + size2); - EXPECT_EQ(t2->consumption(), -size1); // _untracked_mem = size1 + EXPECT_EQ(t1->consumption(), size1 + size2); // _untracked_mem = size1 + EXPECT_EQ(t2->consumption(), 0); thread_context->consume_memory(size2); EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2); @@ -200,13 +200,13 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { thread_context->consume_memory(size2); thread_context->consume_memory(-size1); // _untracked_mem = -size1 EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2); - EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2); - EXPECT_EQ(t3->consumption(), size1 + size2); + EXPECT_EQ(t2->consumption(), size2 + size2 + size2); + EXPECT_EQ(t3->consumption(), size2); thread_context->thread_mem_tracker_mgr->pop_consumer_tracker(); - EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1); - EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1); - EXPECT_EQ(t3->consumption(), size1 + size2 - size1); + EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2); + EXPECT_EQ(t2->consumption(), size2 + size2 + size2); + EXPECT_EQ(t3->consumption(), size2); thread_context->consume_memory(-size2); thread_context->consume_memory(size2); @@ -214,40 +214,14 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) { thread_context->thread_mem_tracker_mgr->pop_consumer_tracker(); EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1 - size2); - EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2); - EXPECT_EQ(t3->consumption(), size1 + size2 - size1); + EXPECT_EQ(t2->consumption(), size2 + size2); + EXPECT_EQ(t3->consumption(), size2); thread_context->consume_memory(-t1->consumption()); thread_context->detach_task(); // detach t1 EXPECT_EQ(t1->consumption(), 0); - EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - size2); - EXPECT_EQ(t3->consumption(), size1 + size2 - size1); -} - -TEST_F(ThreadMemTrackerMgrTest, ScopedCount) { - std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>(); - std::shared_ptr<MemTrackerLimiter> t1 = - MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ScopedCount"); - - int64_t size1 = 4 * 1024; - int64_t size2 = 4 * 1024 * 1024; - - thread_context->attach_task(TUniqueId(), t1, workload_group); - thread_context->thread_mem_tracker_mgr->start_count_scope_mem(); - thread_context->consume_memory(size1); - thread_context->consume_memory(size2); - thread_context->consume_memory(size1); - thread_context->consume_memory(size2); - thread_context->consume_memory(size1); - int64_t scope_mem = thread_context->thread_mem_tracker_mgr->stop_count_scope_mem(); - EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size1); - EXPECT_EQ(t1->consumption(), scope_mem); - - thread_context->consume_memory(-size2); - thread_context->consume_memory(-size1); - thread_context->consume_memory(-size2); - EXPECT_EQ(t1->consumption(), size1 + size1); - EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1); + EXPECT_EQ(t2->consumption(), size2 + size2); + EXPECT_EQ(t3->consumption(), size2); } TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { @@ -265,7 +239,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2); auto st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -304,7 +278,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -355,7 +329,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { thread_context->attach_task(TUniqueId(), t, workload_group); auto st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -367,7 +341,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); st = thread_context->try_reserve_memory(size2); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2 // ThreadMemTrackerMgr _untracked_mem = 0 EXPECT_EQ(t->consumption(), size3 + size2); @@ -375,9 +349,9 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { size3); // size3 - size2 + size2 st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); thread_context->consume_memory(size3); thread_context->consume_memory(size2); thread_context->consume_memory(size3); @@ -412,14 +386,14 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { thread_context->attach_task(TUniqueId(), t1, workload_group); auto st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); thread_context->consume_memory(size2); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3); @@ -431,7 +405,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); st = thread_context->try_reserve_memory(size3); - EXPECT_TRUE(st.ok()); + EXPECT_TRUE(st.ok()) << st.to_string(); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3 + size2); EXPECT_EQ(t3->consumption(), size3); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org