This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4b30485d62 [improvement](memory) Refactor doris cache GC (#21522) 4b30485d62 is described below commit 4b30485d623890b54110ed582a9d5c5674a0cc48 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Tue Jul 11 20:21:31 2023 +0800 [improvement](memory) Refactor doris cache GC (#21522) Abstract CachePolicy, which controls the gc of all caches. Add stale sweep to all lru caches, including page caches, etc. I0710 18:32:35.729460 2945318 mem_info.cpp:172] End Full GC Free, Memory 3866389992 Bytes. cost(us): 112165339, details: FullGC: FreeTopMemoryQuery: - CancelCostTime: 1m51s - CancelTasksNum: 1 - FindCostTime: 0.000ns - FreedMemory: 2.93 GB WorkloadGroup: Cache name=DataPageCache: - CostTime: 15.283ms - FreedEntrys: 9.56K - FreedMemory: 691.97 MB - PruneAllNumber: 1 - PruneStaleNumber: 1 --- be/src/common/config.cpp | 11 +- be/src/common/config.h | 10 +- be/src/olap/page_cache.cpp | 25 +-- be/src/olap/page_cache.h | 57 ++++-- .../rowset/segment_v2/inverted_index_cache.cpp | 56 +----- .../olap/rowset/segment_v2/inverted_index_cache.h | 36 +--- be/src/olap/schema_cache.cpp | 22 --- be/src/olap/schema_cache.h | 23 ++- be/src/olap/segment_loader.cpp | 50 ++--- be/src/olap/segment_loader.h | 65 +++---- be/src/olap/storage_engine.cpp | 4 +- be/src/runtime/exec_env_init.cpp | 5 +- be/src/runtime/memory/cache_manager.cpp | 49 +++++ be/src/runtime/memory/cache_manager.h | 63 +++++++ be/src/runtime/memory/cache_policy.h | 66 +++++++ be/src/runtime/memory/lru_cache_policy.h | 95 ++++++++++ be/src/runtime/memory/mem_tracker_limiter.cpp | 209 +++++++++++++-------- be/src/runtime/memory/mem_tracker_limiter.h | 25 ++- be/src/runtime/memory/thread_mem_tracker_mgr.h | 2 +- be/src/util/mem_info.cpp | 122 +++++------- be/src/util/mem_info.h | 6 +- be/src/util/obj_lru_cache.h | 1 + be/src/vec/common/allocator.cpp | 3 +- be/test/testutil/run_all_tests.cpp | 1 + 24 files changed, 613 insertions(+), 393 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 95a583b9b0..7306437b20 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -315,6 +315,10 @@ DEFINE_String(pk_storage_page_cache_limit, "10%"); // data page size for primary key index DEFINE_Int32(primary_key_data_page_size, "32768"); +DEFINE_mInt32(data_page_cache_stale_sweep_time_sec, "300"); +DEFINE_mInt32(index_page_cache_stale_sweep_time_sec, "600"); +DEFINE_mInt32(pk_index_page_cache_stale_sweep_time_sec, "600"); + DEFINE_Bool(enable_low_cardinality_optimize, "true"); DEFINE_Bool(enable_low_cardinality_cache_code, "true"); @@ -949,11 +953,8 @@ DEFINE_Validator(file_cache_min_file_segment_size, [](const int64_t config) -> b DEFINE_Bool(clear_file_cache, "false"); DEFINE_Bool(enable_file_cache_query_limit, "false"); -// inverted index searcher cache -// cache entry stay time after lookup, default 1h -DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "3600"); -// cache entry that have not been visited for a certain period of time can be cleaned up by GC thread -DEFINE_mInt32(index_cache_entry_no_visit_gc_time_s, "3600"); +DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); +DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); // inverted index searcher cache size DEFINE_String(inverted_index_searcher_cache_limit, "10%"); // set `true` to enable insert searcher into cache when write inverted index data diff --git a/be/src/common/config.h b/be/src/common/config.h index 656d955c4b..ef976b1f8f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -358,6 +358,12 @@ DECLARE_String(pk_storage_page_cache_limit); // data page size for primary key index DECLARE_Int32(primary_key_data_page_size); +// inc_rowset snapshot rs sweep time interval +DECLARE_mInt32(data_page_cache_stale_sweep_time_sec); +DECLARE_mInt32(index_page_cache_stale_sweep_time_sec); +// great impact on the performance of MOW, so it can be longer. +DECLARE_mInt32(pk_index_page_cache_stale_sweep_time_sec); + DECLARE_Bool(enable_low_cardinality_optimize); DECLARE_Bool(enable_low_cardinality_cache_code); @@ -968,10 +974,10 @@ DECLARE_Bool(clear_file_cache); DECLARE_Bool(enable_file_cache_query_limit); // inverted index searcher cache -// cache entry stay time after lookup, default 1h +// cache entry stay time after lookup DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s); // cache entry that have not been visited for a certain period of time can be cleaned up by GC thread -DECLARE_mInt32(index_cache_entry_no_visit_gc_time_s); +DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec); // inverted index searcher cache size DECLARE_String(inverted_index_searcher_cache_limit); // set `true` to enable insert searcher into cache when write inverted index data diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index de9aeaa28f..47a2379576 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -37,24 +37,20 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta int64_t pk_index_cache_capacity, uint32_t num_shards) : _index_cache_percentage(index_cache_percentage) { if (index_cache_percentage == 0) { - _data_page_cache = std::unique_ptr<Cache>( - new_lru_cache("DataPageCache", capacity, LRUCacheType::SIZE, num_shards)); + _data_page_cache = std::make_unique<DataPageCache>(capacity, num_shards); } else if (index_cache_percentage == 100) { - _index_page_cache = std::unique_ptr<Cache>( - new_lru_cache("IndexPageCache", capacity, LRUCacheType::SIZE, num_shards)); + _index_page_cache = std::make_unique<IndexPageCache>(capacity, num_shards); } else if (index_cache_percentage > 0 && index_cache_percentage < 100) { - _data_page_cache = std::unique_ptr<Cache>( - new_lru_cache("DataPageCache", capacity * (100 - index_cache_percentage) / 100, - LRUCacheType::SIZE, num_shards)); - _index_page_cache = std::unique_ptr<Cache>( - new_lru_cache("IndexPageCache", capacity * index_cache_percentage / 100, - LRUCacheType::SIZE, num_shards)); + _data_page_cache = std::make_unique<DataPageCache>( + capacity * (100 - index_cache_percentage) / 100, num_shards); + _index_page_cache = std::make_unique<IndexPageCache>( + capacity * index_cache_percentage / 100, num_shards); } else { CHECK(false) << "invalid index page cache percentage"; } if (pk_index_cache_capacity > 0) { - _pk_index_page_cache = std::unique_ptr<Cache>(new_lru_cache( - "PkIndexPageCache", pk_index_cache_capacity, LRUCacheType::SIZE, num_shards)); + _pk_index_page_cache = + std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards); } } @@ -86,9 +82,4 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand *handle = PageCacheHandle(cache, lru_handle); } -void StoragePageCache::prune(segment_v2::PageTypePB page_type) { - auto cache = _get_page_cache(page_type); - cache->prune(); -} - } // namespace doris diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index 7e5ca4de6e..b065d52d0e 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -27,6 +27,7 @@ #include <utility> #include "olap/lru_cache.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/slice.h" #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" @@ -36,7 +37,7 @@ namespace doris { class PageCacheHandle; template <typename TAllocator> -class PageBase : private TAllocator { +class PageBase : private TAllocator, LRUCacheValueBase { public: PageBase() : _data(nullptr), _size(0), _capacity(0) {} @@ -99,6 +100,27 @@ public: } }; + class DataPageCache : public LRUCachePolicy { + public: + DataPageCache(size_t capacity, uint32_t num_shards) + : LRUCachePolicy("DataPageCache", capacity, LRUCacheType::SIZE, + config::data_page_cache_stale_sweep_time_sec, num_shards) {} + }; + + class IndexPageCache : public LRUCachePolicy { + public: + IndexPageCache(size_t capacity, uint32_t num_shards) + : LRUCachePolicy("IndexPageCache", capacity, LRUCacheType::SIZE, + config::index_page_cache_stale_sweep_time_sec, num_shards) {} + }; + + class PKIndexPageCache : public LRUCachePolicy { + public: + PKIndexPageCache(size_t capacity, uint32_t num_shards) + : LRUCachePolicy("PKIndexPageCache", capacity, LRUCacheType::SIZE, + config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {} + }; + static constexpr uint32_t kDefaultNumShards = 16; // Create global instance of this class @@ -138,33 +160,38 @@ public: return _get_page_cache(page_type) != nullptr; } - void prune(segment_v2::PageTypePB page_type); - - int64_t get_page_cache_mem_consumption(segment_v2::PageTypePB page_type) { - return _get_page_cache(page_type)->mem_consumption(); - } - private: StoragePageCache(); static StoragePageCache* _s_instance; int32_t _index_cache_percentage = 0; - std::unique_ptr<Cache> _data_page_cache = nullptr; - std::unique_ptr<Cache> _index_page_cache = nullptr; + std::unique_ptr<DataPageCache> _data_page_cache = nullptr; + std::unique_ptr<IndexPageCache> _index_page_cache = nullptr; // Cache data for primary key index data page, seperated from data // page cache to make it for flexible. we need this cache When construct // delete bitmap in unique key with mow - std::unique_ptr<Cache> _pk_index_page_cache = nullptr; + std::unique_ptr<PKIndexPageCache> _pk_index_page_cache = nullptr; Cache* _get_page_cache(segment_v2::PageTypePB page_type) { switch (page_type) { case segment_v2::DATA_PAGE: { - return _data_page_cache.get(); + if (_data_page_cache) { + return _data_page_cache->get(); + } + return nullptr; + } + case segment_v2::INDEX_PAGE: { + if (_index_page_cache) { + return _index_page_cache->get(); + } + return nullptr; + } + case segment_v2::PRIMARY_KEY_INDEX_PAGE: { + if (_pk_index_page_cache) { + return _pk_index_page_cache->get(); + } + return nullptr; } - case segment_v2::INDEX_PAGE: - return _index_page_cache.get(); - case segment_v2::PRIMARY_KEY_INDEX_PAGE: - return _pk_index_page_cache.get(); default: return nullptr; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index 243a738da9..f3c68984eb 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -62,7 +62,9 @@ void InvertedIndexSearcherCache::create_global_instance(size_t capacity, uint32_ } InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) - : _mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) { + : LRUCachePolicy("InvertedIndexSearcherCache", + config::inverted_index_cache_stale_sweep_time_sec), + _mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); uint64_t fd_number = config::min_file_descriptor_number; struct rlimit l; @@ -179,32 +181,6 @@ Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) { return Status::OK(); } -int64_t InvertedIndexSearcherCache::prune() { - if (_cache) { - const int64_t curtime = UnixMillis(); - int64_t byte_size = 0L; - auto pred = [curtime, &byte_size](const void* value) -> bool { - InvertedIndexSearcherCache::CacheValue* cache_value = - (InvertedIndexSearcherCache::CacheValue*)value; - if ((cache_value->last_visit_time + - config::index_cache_entry_no_visit_gc_time_s * 1000) < curtime) { - byte_size += cache_value->size; - return true; - } - return false; - }; - - MonotonicStopWatch watch; - watch.start(); - // Prune cache in lazy mode to save cpu and minimize the time holding write lock - int64_t prune_num = _cache->prune_if(pred, true); - LOG(INFO) << "prune " << prune_num << " entries in inverted index cache. cost(ms): " - << watch.elapsed_time() / 1000 / 1000; - return byte_size; - } - return 0L; -} - int64_t InvertedIndexSearcherCache::mem_consumption() { if (_cache) { return _cache->mem_consumption(); @@ -269,32 +245,6 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin *handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle); } -int64_t InvertedIndexQueryCache::prune() { - if (_cache) { - const int64_t curtime = UnixMillis(); - int64_t byte_size = 0L; - auto pred = [curtime, &byte_size](const void* value) -> bool { - InvertedIndexQueryCache::CacheValue* cache_value = - (InvertedIndexQueryCache::CacheValue*)value; - if ((cache_value->last_visit_time + - config::index_cache_entry_no_visit_gc_time_s * 1000) < curtime) { - byte_size += cache_value->size; - return true; - } - return false; - }; - - MonotonicStopWatch watch; - watch.start(); - // Prune cache in lazy mode to save cpu and minimize the time holding write lock - int64_t prune_num = _cache->prune_if(pred, true); - LOG(INFO) << "prune " << prune_num << " entries in inverted index cache. cost(ms): " - << watch.elapsed_time() / 1000 / 1000; - return byte_size; - } - return 0L; -} - int64_t InvertedIndexQueryCache::mem_consumption() { if (_cache) { return _cache->mem_consumption(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 941fb2dc63..9f368eca0c 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -37,6 +37,7 @@ #include "io/fs/path.h" #include "olap/lru_cache.h" #include "olap/rowset/segment_v2/inverted_index_query_type.h" +#include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker.h" #include "util/slice.h" #include "util/time.h" @@ -55,7 +56,7 @@ using IndexSearcherPtr = std::shared_ptr<lucene::search::IndexSearcher>; class InvertedIndexCacheHandle; -class InvertedIndexSearcherCache { +class InvertedIndexSearcherCache : public LRUCachePolicy { public: // The cache key of index_searcher lru cache struct CacheKey { @@ -65,12 +66,8 @@ public: // The cache value of index_searcher lru cache. // Holding a opened index_searcher. - struct CacheValue { - // Save the last visit time of this cache entry. - // Use atomic because it may be modified by multi threads. - std::atomic<int64_t> last_visit_time = 0; + struct CacheValue : public LRUCacheValueBase { IndexSearcherPtr index_searcher; - size_t size = 0; }; // Create global instance of this class. @@ -110,8 +107,6 @@ public: // function `erase` called after compaction remove segment Status erase(const std::string& index_file_path); - int64_t prune(); - int64_t mem_consumption(); private: @@ -129,8 +124,6 @@ private: private: static InvertedIndexSearcherCache* _s_instance; - // A LRU cache to cache all opened index_searcher - std::unique_ptr<Cache> _cache = nullptr; std::unique_ptr<MemTracker> _mem_tracker = nullptr; }; @@ -199,7 +192,7 @@ private: class InvertedIndexQueryCacheHandle; -class InvertedIndexQueryCache { +class InvertedIndexQueryCache : public LRUCachePolicy { public: // cache key struct CacheKey { @@ -226,19 +219,14 @@ public: } }; - struct CacheValue { - // Save the last visit time of this cache entry. - // Use atomic because it may be modified by multi threads. - std::atomic<int64_t> last_visit_time = 0; + struct CacheValue : public LRUCacheValueBase { std::shared_ptr<roaring::Roaring> bitmap; - size_t size = 0; }; // Create global instance of this class - static void create_global_cache(size_t capacity, int32_t index_cache_percentage, - uint32_t num_shards = 16) { + static void create_global_cache(size_t capacity, uint32_t num_shards = 16) { DCHECK(_s_instance == nullptr); - static InvertedIndexQueryCache instance(capacity, index_cache_percentage, num_shards); + static InvertedIndexQueryCache instance(capacity, num_shards); _s_instance = &instance; } @@ -248,23 +236,19 @@ public: InvertedIndexQueryCache() = delete; - InvertedIndexQueryCache(size_t capacity, int32_t index_cache_percentage, uint32_t num_shards) { - _cache = std::unique_ptr<Cache>( - new_lru_cache("InvertedIndexQueryCache", capacity, LRUCacheType::SIZE, num_shards)); - } + InvertedIndexQueryCache(size_t capacity, uint32_t num_shards) + : LRUCachePolicy("InvertedIndexQueryCache", capacity, LRUCacheType::SIZE, + config::inverted_index_cache_stale_sweep_time_sec, num_shards) {} bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle); void insert(const CacheKey& key, std::shared_ptr<roaring::Roaring> bitmap, InvertedIndexQueryCacheHandle* handle); - int64_t prune(); - int64_t mem_consumption(); private: static InvertedIndexQueryCache* _s_instance; - std::unique_ptr<Cache> _cache {nullptr}; }; class InvertedIndexQueryCacheHandle { diff --git a/be/src/olap/schema_cache.cpp b/be/src/olap/schema_cache.cpp index 02b17f18ff..39d1a60a4c 100644 --- a/be/src/olap/schema_cache.cpp +++ b/be/src/olap/schema_cache.cpp @@ -75,26 +75,4 @@ void SchemaCache::create_global_instance(size_t capacity) { _s_instance = &instance; } -SchemaCache::SchemaCache(size_t capacity) { - _schema_cache = - std::unique_ptr<Cache>(new_lru_cache("SchemaCache", capacity, LRUCacheType::NUMBER)); -} - -Status SchemaCache::prune() { - const int64_t curtime = UnixMillis(); - auto pred = [curtime](const void* value) -> bool { - CacheValue* cache_value = (CacheValue*)value; - return (cache_value->last_visit_time + config::schema_cache_sweep_time_sec * 1000) < - curtime; - }; - - MonotonicStopWatch watch; - watch.start(); - // Prune cache in lazy mode to save cpu and minimize the time holding write lock - int64_t prune_num = _schema_cache->prune_if(pred, true); - LOG(INFO) << "prune " << prune_num - << " entries in SchemaCache cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000; - return Status::OK(); -} - } // namespace doris \ No newline at end of file diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index b45a252696..f34f7c296d 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>; // eliminating the need for frequent allocation and deallocation during usage. // This caching mechanism proves immensely advantageous, particularly in scenarios // with high concurrency, where queries are executed simultaneously. -class SchemaCache { +class SchemaCache : public LRUCachePolicy { public: enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 }; @@ -65,11 +65,10 @@ public: if (!_s_instance || schema_key.empty()) { return {}; } - auto lru_handle = _schema_cache->lookup(schema_key); + auto lru_handle = _cache->lookup(schema_key); if (lru_handle) { - Defer release( - [cache = _schema_cache.get(), lru_handle] { cache->release(lru_handle); }); - auto value = (CacheValue*)_schema_cache->value(lru_handle); + Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)_cache->value(lru_handle); value->last_visit_time = UnixMillis(); VLOG_DEBUG << "use cache schema"; if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) { @@ -101,26 +100,26 @@ public: CacheValue* cache_value = (CacheValue*)value; delete cache_value; }; - auto lru_handle = _schema_cache->insert(key, value, sizeof(CacheValue), deleter, - CachePriority::NORMAL, schema->mem_size()); - _schema_cache->release(lru_handle); + auto lru_handle = _cache->insert(key, value, sizeof(CacheValue), deleter, + CachePriority::NORMAL, schema->mem_size()); + _cache->release(lru_handle); } // Try to prune the cache if expired. Status prune(); - struct CacheValue { + struct CacheValue : public LRUCacheValueBase { Type type; - std::atomic<int64_t> last_visit_time = 0; // either tablet_schema or schema TabletSchemaSPtr tablet_schema = nullptr; SchemaSPtr schema = nullptr; }; private: - SchemaCache(size_t capacity); + SchemaCache(size_t capacity) + : LRUCachePolicy("SchemaCache", capacity, LRUCacheType::NUMBER, + config::schema_cache_sweep_time_sec) {} static constexpr char SCHEMA_DELIMITER = '-'; - std::unique_ptr<Cache> _schema_cache = nullptr; static SchemaCache* _s_instance; }; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 3d61ef05db..aad8ac517a 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -32,12 +32,7 @@ void SegmentLoader::create_global_instance(size_t capacity) { _s_instance = &instance; } -SegmentLoader::SegmentLoader(size_t capacity) { - _cache = std::unique_ptr<Cache>( - new_lru_cache("SegmentMetaCache", capacity, LRUCacheType::NUMBER)); -} - -bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* handle) { +bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { auto lru_handle = _cache->lookup(key.encode()); if (lru_handle == nullptr) { return false; @@ -46,10 +41,10 @@ bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, SegmentCacheHand return true; } -void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::CacheValue& value, - SegmentCacheHandle* handle) { +void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value, + SegmentCacheHandle* handle) { auto deleter = [](const doris::CacheKey& key, void* value) { - SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)value; + SegmentCache::CacheValue* cache_value = (SegmentCache::CacheValue*)value; cache_value->segments.clear(); delete cache_value; }; @@ -59,15 +54,19 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::C meta_mem_usage += segment->meta_mem_usage(); } - auto lru_handle = _cache->insert(key.encode(), &value, sizeof(SegmentLoader::CacheValue), + auto lru_handle = _cache->insert(key.encode(), &value, sizeof(SegmentCache::CacheValue), deleter, CachePriority::NORMAL, meta_mem_usage); *handle = SegmentCacheHandle(_cache.get(), lru_handle); } +void SegmentCache::erase(const SegmentCache::CacheKey& key) { + _cache->erase(key.encode()); +} + Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, bool use_cache) { - SegmentLoader::CacheKey cache_key(rowset->rowset_id()); - if (_lookup(cache_key, cache_handle)) { + SegmentCache::CacheKey cache_key(rowset->rowset_id()); + if (_segment_cache->lookup(cache_key, cache_handle)) { cache_handle->owned = false; return Status::OK(); } @@ -77,10 +76,10 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, RETURN_IF_ERROR(rowset->load_segments(&segments)); if (use_cache) { - // memory of SegmentLoader::CacheValue will be handled by SegmentLoader - SegmentLoader::CacheValue* cache_value = new SegmentLoader::CacheValue(); + // memory of SegmentCache::CacheValue will be handled by SegmentCache + SegmentCache::CacheValue* cache_value = new SegmentCache::CacheValue(); cache_value->segments = std::move(segments); - _insert(cache_key, *cache_value, cache_handle); + _segment_cache->insert(cache_key, *cache_value, cache_handle); } else { cache_handle->segments = std::move(segments); } @@ -88,25 +87,8 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, return Status::OK(); } -void SegmentLoader::erase_segment(const SegmentLoader::CacheKey& key) { - _cache->erase(key.encode()); -} - -Status SegmentLoader::prune() { - const int64_t curtime = UnixMillis(); - auto pred = [curtime](const void* value) -> bool { - SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)value; - return (cache_value->last_visit_time + config::tablet_rowset_stale_sweep_time_sec * 1000) < - curtime; - }; - - MonotonicStopWatch watch; - watch.start(); - // Prune cache in lazy mode to save cpu and minimize the time holding write lock - int64_t prune_num = _cache->prune_if(pred, true); - LOG(INFO) << "prune " << prune_num - << " entries in segment cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000; - return Status::OK(); +void SegmentLoader::erase_segment(const SegmentCache::CacheKey& key) { + _segment_cache->erase(key); } } // namespace doris diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 9e6a8af82a..600692750e 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -33,6 +33,7 @@ #include "olap/lru_cache.h" #include "olap/olap_common.h" // for rowset id #include "olap/rowset/segment_v2/segment.h" +#include "runtime/memory/lru_cache_policy.h" #include "util/time.h" namespace doris { @@ -53,7 +54,8 @@ class BetaRowset; // // Make sure that cache_handle is valid during the segment usage period. using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>; -class SegmentLoader { + +class SegmentCache : public LRUCachePolicy { public: // The cache key or segment lru cache struct CacheKey { @@ -61,18 +63,34 @@ public: RowsetId rowset_id; // Encode to a flat binary which can be used as LRUCache's key - std::string encode() const { return rowset_id.to_string(); } + [[nodiscard]] std::string encode() const { return rowset_id.to_string(); } }; // The cache value of segment lru cache. // Holding all opened segments of a rowset. - struct CacheValue { - // Save the last visit time of this cache entry. - // Use atomic because it may be modified by multi threads. - std::atomic<int64_t> last_visit_time = 0; + struct CacheValue : public LRUCacheValueBase { std::vector<segment_v2::SegmentSharedPtr> segments; }; + SegmentCache(size_t capacity) + : LRUCachePolicy("SegmentCache", capacity, LRUCacheType::NUMBER, + config::tablet_rowset_stale_sweep_time_sec) {} + + // Lookup the given rowset in the cache. + // If the rowset is found, the cache entry will be written into handle. + // Return true if entry is found, otherwise return false. + bool lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle); + + // Insert a cache entry by key. + // And the cache entry will be returned in handle. + // This function is thread-safe. + void insert(const SegmentCache::CacheKey& key, CacheValue& value, SegmentCacheHandle* handle); + + void erase(const SegmentCache::CacheKey& key); +}; + +class SegmentLoader { +public: // Create global instance of this class. // "capacity" is the capacity of lru cache. // TODO: Currently we use the number of rowset as the cache capacity. @@ -86,43 +104,20 @@ public: // Client should call create_global_cache before. static SegmentLoader* instance() { return _s_instance; } - SegmentLoader(size_t capacity); + SegmentLoader(size_t capacity) { _segment_cache = std::make_unique<SegmentCache>(capacity); } // Load segments of "rowset", return the "cache_handle" which contains segments. // If use_cache is true, it will be loaded from _cache. Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, bool use_cache = false); - void erase_segment(const SegmentLoader::CacheKey& key); - - // Try to prune the segment cache if expired. - Status prune(); - int64_t prune_all() { return _cache->prune(); }; - int64_t segment_cache_mem_consumption() { return _cache->mem_consumption(); } - int64_t segment_cache_get_usage() { return _cache->get_usage(); } - double segment_cache_get_usage_ratio() { - return _cache->get_total_capacity() == 0 - ? 0 - : ((double)_cache->get_usage() / _cache->get_total_capacity()); - } + void erase_segment(const SegmentCache::CacheKey& key); private: SegmentLoader(); - // Lookup the given rowset in the cache. - // If the rowset is found, the cache entry will be written into handle. - // Return true if entry is found, otherwise return false. - bool _lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* handle); - - // Insert a cache entry by key. - // And the cache entry will be returned in handle. - // This function is thread-safe. - void _insert(const SegmentLoader::CacheKey& key, CacheValue& value, SegmentCacheHandle* handle); - -private: static SegmentLoader* _s_instance; - // A LRU cache to cache all opened segments - std::unique_ptr<Cache> _cache = nullptr; + std::unique_ptr<SegmentCache> _segment_cache = nullptr; }; // A handle for a single rowset from segment lru cache. @@ -132,7 +127,7 @@ private: // So the caller need to make sure the handle is valid in lifecycle. class SegmentCacheHandle { public: - SegmentCacheHandle() {} + SegmentCacheHandle() = default; SegmentCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {} ~SegmentCacheHandle() { @@ -142,7 +137,7 @@ public: CHECK(!owned); // last_visit_time is set when release. // because it only be needed when pruning. - ((SegmentLoader::CacheValue*)_cache->value(_handle))->last_visit_time = UnixMillis(); + ((SegmentCache::CacheValue*)_cache->value(_handle))->last_visit_time = UnixMillis(); _cache->release(_handle); } } @@ -166,7 +161,7 @@ public: if (owned) { return segments; } else { - return ((SegmentLoader::CacheValue*)_cache->value(_handle))->segments; + return ((SegmentCache::CacheValue*)_cache->value(_handle))->segments; } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 6a2b869099..af10713ea6 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -66,6 +66,7 @@ #include "olap/tablet_meta.h" #include "olap/task/engine_task.h" #include "olap/txn_manager.h" +#include "runtime/memory/cache_manager.h" #include "runtime/memory/mem_tracker.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" @@ -614,8 +615,7 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, } void StorageEngine::_start_clean_cache() { - SegmentLoader::instance()->prune(); - SchemaCache::instance()->prune(); + CacheManager::instance()->for_each_cache_prune_stale(); } Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 29ac809246..6599f36d79 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -55,6 +55,7 @@ #include "runtime/heartbeat_flags.h" #include "runtime/load_channel_mgr.h" #include "runtime/load_path_mgr.h" +#include "runtime/memory/cache_manager.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" @@ -228,6 +229,8 @@ Status ExecEnv::_init_mem_env() { } // 3. init storage page cache + CacheManager::create_global_instance(); + int64_t storage_cache_limit = ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); @@ -308,7 +311,7 @@ Status ExecEnv::_init_mem_env() { // Reason same as buffer_pool_limit inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2; } - InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit, 10); + InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit); LOG(INFO) << "Inverted index query match cache memory limit: " << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp new file mode 100644 index 0000000000..027ed81b16 --- /dev/null +++ b/be/src/runtime/memory/cache_manager.cpp @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/cache_manager.h" + +#include "runtime/memory/cache_policy.h" +#include "util/runtime_profile.h" + +namespace doris { + +int64_t CacheManager::for_each_cache_prune_stale_wrap( + std::function<void(CachePolicy* cache_policy)> func, RuntimeProfile* profile) { + int64_t freed_size = 0; + std::lock_guard<std::mutex> l(_caches_lock); + for (auto cache_policy : _caches) { + func(cache_policy); + freed_size += cache_policy->profile()->get_counter("FreedMemory")->value(); + if (cache_policy->profile()->get_counter("FreedMemory")->value() != 0 && profile) { + profile->add_child(cache_policy->profile(), true, nullptr); + } + } + return freed_size; +} + +int64_t CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) { + return for_each_cache_prune_stale_wrap( + [](CachePolicy* cache_policy) { cache_policy->prune_stale(); }, profile); +} + +int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) { + return for_each_cache_prune_stale_wrap( + [](CachePolicy* cache_policy) { cache_policy->prune_all(); }, profile); +} + +} // namespace doris diff --git a/be/src/runtime/memory/cache_manager.h b/be/src/runtime/memory/cache_manager.h new file mode 100644 index 0000000000..6086c02b94 --- /dev/null +++ b/be/src/runtime/memory/cache_manager.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/runtime_profile.h" + +namespace doris { + +class CachePolicy; + +// Hold the list of all caches, for prune when memory not enough or timing. +class CacheManager { +public: + static void create_global_instance() { + DCHECK(_s_instance == nullptr); + static CacheManager instance; + _s_instance = &instance; + } + static CacheManager* instance() { return _s_instance; } + + std::list<CachePolicy*>::iterator register_cache(CachePolicy* cache) { + std::lock_guard<std::mutex> l(_caches_lock); + return _caches.insert(_caches.end(), cache); + } + + void unregister_cache(std::list<CachePolicy*>::iterator it) { + std::lock_guard<std::mutex> l(_caches_lock); + if (it != _caches.end()) { + _caches.erase(it); + it = _caches.end(); + } + } + + int64_t for_each_cache_prune_stale_wrap(std::function<void(CachePolicy* cache_policy)> func, + RuntimeProfile* profile = nullptr); + + int64_t for_each_cache_prune_stale(RuntimeProfile* profile = nullptr); + + int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr); + +private: + static inline CacheManager* _s_instance = nullptr; + + std::mutex _caches_lock; + std::list<CachePolicy*> _caches; +}; + +} // namespace doris diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h new file mode 100644 index 0000000000..14308088e6 --- /dev/null +++ b/be/src/runtime/memory/cache_policy.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/memory/cache_manager.h" +#include "util/runtime_profile.h" + +namespace doris { + +static constexpr int32_t CACHE_MIN_FREE_SIZE = 67108864; // 64M + +// Base of all caches. register to CacheManager when cache is constructed. +class CachePolicy { +public: + CachePolicy(const std::string& name, uint32_t stale_sweep_time_s) + : _name(name), _stale_sweep_time_s(stale_sweep_time_s) { + _it = CacheManager::instance()->register_cache(this); + init_profile(); + } + + virtual ~CachePolicy() { CacheManager::instance()->unregister_cache(_it); }; + virtual void prune_stale() = 0; + virtual void prune_all() = 0; + + RuntimeProfile* profile() { return _profile.get(); } + +protected: + void init_profile() { + _profile = std::make_unique<RuntimeProfile>(fmt::format("Cache name={}", _name)); + _prune_stale_number_counter = ADD_COUNTER(_profile, "PruneStaleNumber", TUnit::UNIT); + _prune_all_number_counter = ADD_COUNTER(_profile, "PruneAllNumber", TUnit::UNIT); + _freed_memory_counter = ADD_COUNTER(_profile, "FreedMemory", TUnit::BYTES); + _freed_entrys_counter = ADD_COUNTER(_profile, "FreedEntrys", TUnit::UNIT); + _cost_timer = ADD_TIMER(_profile, "CostTime"); + } + + std::string _name; + std::list<CachePolicy*>::iterator _it; + + std::unique_ptr<RuntimeProfile> _profile; + RuntimeProfile::Counter* _prune_stale_number_counter = nullptr; + RuntimeProfile::Counter* _prune_all_number_counter = nullptr; + // Reset before each gc + RuntimeProfile::Counter* _freed_memory_counter = nullptr; + RuntimeProfile::Counter* _freed_entrys_counter = nullptr; + RuntimeProfile::Counter* _cost_timer = nullptr; + + uint32_t _stale_sweep_time_s; +}; + +} // namespace doris diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h new file mode 100644 index 0000000000..fd900bea6c --- /dev/null +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/lru_cache.h" +#include "runtime/memory/cache_policy.h" +#include "util/time.h" + +namespace doris { + +// Base of the lru cache value. +struct LRUCacheValueBase { + // Save the last visit time of this cache entry. + // Use atomic because it may be modified by multi threads. + std::atomic<int64_t> last_visit_time = 0; + size_t size = 0; +}; + +// Base of lru cache, allow prune stale entry and prune all entry. +class LRUCachePolicy : public CachePolicy { +public: + LRUCachePolicy(const std::string& name, uint32_t stale_sweep_time_s) + : CachePolicy(name, stale_sweep_time_s) {}; + LRUCachePolicy(const std::string& name, size_t capacity, LRUCacheType type, + uint32_t stale_sweep_time_s, uint32_t num_shards = -1) + : CachePolicy(name, stale_sweep_time_s) { + _cache = num_shards == -1 + ? std::unique_ptr<Cache>(new_lru_cache(name, capacity, type)) + : std::unique_ptr<Cache>(new_lru_cache(name, capacity, type, num_shards)); + } + + ~LRUCachePolicy() override = default; + + // Try to prune the cache if expired. + void prune_stale() override { + if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { + COUNTER_SET(_cost_timer, (int64_t)0); + SCOPED_TIMER(_cost_timer); + const int64_t curtime = UnixMillis(); + int64_t byte_size = 0L; + auto pred = [this, curtime, &byte_size](const void* value) -> bool { + LRUCacheValueBase* cache_value = (LRUCacheValueBase*)value; + if ((cache_value->last_visit_time + _stale_sweep_time_s * 1000) < curtime) { + byte_size += cache_value->size; + return true; + } + return false; + }; + + // Prune cache in lazy mode to save cpu and minimize the time holding write lock + COUNTER_SET(_freed_entrys_counter, _cache->prune_if(pred, true)); + COUNTER_SET(_freed_memory_counter, byte_size); + COUNTER_UPDATE(_prune_stale_number_counter, 1); + LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {} times prune", _name, + _freed_entrys_counter->value(), _freed_memory_counter->value(), + _prune_stale_number_counter->value()); + } + } + + void prune_all() override { + if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) { + COUNTER_SET(_cost_timer, (int64_t)0); + SCOPED_TIMER(_cost_timer); + auto size = _cache->mem_consumption(); + COUNTER_SET(_freed_entrys_counter, _cache->prune()); + COUNTER_SET(_freed_memory_counter, size); + COUNTER_UPDATE(_prune_all_number_counter, 1); + LOG(INFO) << fmt::format("{} prune all {} entries, {} bytes, {} times prune", _name, + _freed_entrys_counter->value(), _freed_memory_counter->value(), + _prune_stale_number_counter->value()); + } + } + + Cache* get() { return _cache.get(); } + +protected: + std::unique_ptr<Cache> _cache; +}; + +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index e6c09d51c1..e44da881eb 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -35,6 +35,7 @@ #include "util/mem_info.h" #include "util/perf_counters.h" #include "util/pretty_printer.h" +#include "util/runtime_profile.h" namespace doris { @@ -45,6 +46,18 @@ static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(MEM_TRACKER_GRO std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true}; +// Reset before each free +static std::unique_ptr<RuntimeProfile> free_top_memory_task_profile { + std::make_unique<RuntimeProfile>("-")}; +static RuntimeProfile::Counter* find_cost_time = + ADD_TIMER(free_top_memory_task_profile, "FindCostTime"); +static RuntimeProfile::Counter* cancel_cost_time = + ADD_TIMER(free_top_memory_task_profile, "CancelCostTime"); +static RuntimeProfile::Counter* freed_memory_counter = + ADD_COUNTER(free_top_memory_task_profile, "FreedMemory", TUnit::BYTES); +static RuntimeProfile::Counter* cancel_tasks_counter = + ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT); + MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit, RuntimeProfile* profile, const std::string& profile_counter_name) { @@ -324,7 +337,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) { int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str, Type type) { + const std::string& mem_available_str, + RuntimeProfile* profile, Type type) { return free_top_memory_query( min_free_mem, type, mem_tracker_limiter_pool, [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, @@ -339,13 +353,15 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(), mem_available_str, print_bytes(MemInfo::sys_mem_available_low_water_mark())); - }); + }, + profile); } template <typename TrackerGroups> int64_t MemTrackerLimiter::free_top_memory_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, - const std::function<std::string(int64_t, const std::string&)>& cancel_msg) { + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile) { using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>>; @@ -353,54 +369,66 @@ int64_t MemTrackerLimiter::free_top_memory_query( // After greater than min_free_mem, will not be modified. int64_t prepare_free_mem = 0; std::vector<std::string> canceling_task; + COUNTER_SET(cancel_cost_time, (int64_t)0); + COUNTER_SET(find_cost_time, (int64_t)0); + COUNTER_SET(freed_memory_counter, (int64_t)0); + COUNTER_SET(cancel_tasks_counter, (int64_t)0); - auto cancel_top_query = [&cancel_msg, type](auto& min_pq, auto& canceling_task) -> int64_t { + auto cancel_top_query = [&cancel_msg, type, profile](auto& min_pq, + auto& canceling_task) -> int64_t { std::vector<std::string> usage_strings; - int64_t freed_mem = 0; - while (!min_pq.empty()) { - TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); - if (cancelled_queryid == TUniqueId()) { + { + SCOPED_TIMER(cancel_cost_time); + while (!min_pq.empty()) { + TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + min_pq.pop(); + continue; + } + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + cancel_msg(min_pq.top().first, min_pq.top().second)); + + COUNTER_UPDATE(freed_memory_counter, min_pq.top().first); + COUNTER_UPDATE(cancel_tasks_counter, 1); + usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, + min_pq.top().first)); min_pq.pop(); - continue; } - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - cancel_msg(min_pq.top().first, min_pq.top().second)); - - freed_mem += min_pq.top().first; - usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, - min_pq.top().first)); - min_pq.pop(); } + profile->merge(free_top_memory_task_profile.get()); LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": " << join(usage_strings, ",") << ". previous canceling task: " << join(canceling_task, ","); - return freed_mem; + return freed_memory_counter->value(); }; - for (unsigned i = 1; i < tracker_groups.size(); ++i) { - std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); - for (auto tracker : tracker_groups[i].trackers) { - if (tracker->type() == type) { - if (tracker->is_query_cancelled()) { - canceling_task.push_back( - fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption())); - continue; - } - if (tracker->consumption() > min_free_mem) { - MemTrackerMinQueue min_pq_single; - min_pq_single.emplace(tracker->consumption(), tracker->label()); - return cancel_top_query(min_pq_single, canceling_task); - } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { - min_pq.emplace(tracker->consumption(), tracker->label()); - prepare_free_mem += tracker->consumption(); - } else if (tracker->consumption() > min_pq.top().first) { - min_pq.emplace(tracker->consumption(), tracker->label()); - prepare_free_mem += tracker->consumption(); - while (prepare_free_mem - min_pq.top().first > min_free_mem) { - prepare_free_mem -= min_pq.top().first; - min_pq.pop(); + { + SCOPED_TIMER(find_cost_time); + for (unsigned i = 1; i < tracker_groups.size(); ++i) { + std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); + for (auto tracker : tracker_groups[i].trackers) { + if (tracker->type() == type) { + if (tracker->is_query_cancelled()) { + canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(), + tracker->consumption())); + continue; + } + if (tracker->consumption() > min_free_mem) { + MemTrackerMinQueue min_pq_single; + min_pq_single.emplace(tracker->consumption(), tracker->label()); + return cancel_top_query(min_pq_single, canceling_task); + } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { + min_pq.emplace(tracker->consumption(), tracker->label()); + prepare_free_mem += tracker->consumption(); + } else if (tracker->consumption() > min_pq.top().first) { + min_pq.emplace(tracker->consumption(), tracker->label()); + prepare_free_mem += tracker->consumption(); + while (prepare_free_mem - min_pq.top().first > min_free_mem) { + prepare_free_mem -= min_pq.top().first; + min_pq.pop(); + } } } } @@ -412,7 +440,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, - Type type) { + RuntimeProfile* profile, Type type) { return free_top_overcommit_query( min_free_mem, type, mem_tracker_limiter_pool, [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption, @@ -427,35 +455,45 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(), mem_available_str, print_bytes(MemInfo::sys_mem_available_warning_water_mark())); - }); + }, + profile); } template <typename TrackerGroups> int64_t MemTrackerLimiter::free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, - const std::function<std::string(int64_t, const std::string&)>& cancel_msg) { + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile) { std::priority_queue<std::pair<int64_t, std::string>> max_pq; std::unordered_map<std::string, int64_t> query_consumption; std::vector<std::string> canceling_task; + COUNTER_SET(cancel_cost_time, (int64_t)0); + COUNTER_SET(find_cost_time, (int64_t)0); + COUNTER_SET(freed_memory_counter, (int64_t)0); + COUNTER_SET(cancel_tasks_counter, (int64_t)0); - for (unsigned i = 1; i < tracker_groups.size(); ++i) { - std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); - for (auto tracker : tracker_groups[i].trackers) { - if (tracker->type() == type) { - // 32M small query does not cancel - if (tracker->consumption() <= 33554432 || - tracker->consumption() < tracker->limit()) { - continue; - } - if (tracker->is_query_cancelled()) { - canceling_task.push_back( - fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption())); - continue; + { + SCOPED_TIMER(find_cost_time); + for (unsigned i = 1; i < tracker_groups.size(); ++i) { + std::lock_guard<std::mutex> l(tracker_groups[i].group_lock); + for (auto tracker : tracker_groups[i].trackers) { + if (tracker->type() == type) { + // 32M small query does not cancel + if (tracker->consumption() <= 33554432 || + tracker->consumption() < tracker->limit()) { + continue; + } + if (tracker->is_query_cancelled()) { + canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(), + tracker->consumption())); + continue; + } + int64_t overcommit_ratio = + (static_cast<double>(tracker->consumption()) / tracker->limit()) * + 10000; + max_pq.emplace(overcommit_ratio, tracker->label()); + query_consumption[tracker->label()] = tracker->consumption(); } - int64_t overcommit_ratio = - (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000; - max_pq.emplace(overcommit_ratio, tracker->label()); - query_consumption[tracker->label()] = tracker->consumption(); } } } @@ -466,37 +504,42 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( } std::vector<std::string> usage_strings; - int64_t freed_mem = 0; - while (!max_pq.empty()) { - TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); - if (cancelled_queryid == TUniqueId()) { + { + SCOPED_TIMER(cancel_cost_time); + while (!max_pq.empty()) { + TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); + if (cancelled_queryid == TUniqueId()) { + max_pq.pop(); + continue; + } + int64_t query_mem = query_consumption[max_pq.top().second]; + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + cancel_msg(query_mem, max_pq.top().second)); + + usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", + max_pq.top().second, query_mem, + max_pq.top().first)); + COUNTER_UPDATE(freed_memory_counter, query_mem); + COUNTER_UPDATE(cancel_tasks_counter, 1); + if (freed_memory_counter->value() > min_free_mem) { + break; + } max_pq.pop(); - continue; } - int64_t query_mem = query_consumption[max_pq.top().second]; - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, - cancel_msg(query_mem, max_pq.top().second)); - - usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", - max_pq.top().second, query_mem, max_pq.top().first)); - freed_mem += query_mem; - if (freed_mem > min_free_mem) { - break; - } - max_pq.pop(); } + profile->merge(free_top_memory_task_profile.get()); LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": " << join(usage_strings, ",") << ". previous canceling task: " << join(canceling_task, ","); - return freed_mem; + return freed_memory_counter->value(); } int64_t MemTrackerLimiter::tg_memory_limit_gc( int64_t need_free_mem, int64_t used_memory, uint64_t id, const std::string& name, - int64_t memory_limit, - std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups) { + int64_t memory_limit, std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups, + RuntimeProfile* profile) { if (need_free_mem <= 0) { return 0; } @@ -515,11 +558,11 @@ int64_t MemTrackerLimiter::tg_memory_limit_gc( }; if (config::enable_query_memory_overcommit) { freed_mem += MemTrackerLimiter::free_top_overcommit_query( - need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str); + need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile); } if (freed_mem < need_free_mem) { - freed_mem += MemTrackerLimiter::free_top_memory_query(need_free_mem - freed_mem, query_type, - tracker_limiter_groups, cancel_str); + freed_mem += MemTrackerLimiter::free_top_memory_query( + need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile); } LOG(INFO) << fmt::format( "task group {} finished gc, memory_limit: {}, used_memory: {}, freed_mem: {}.", name, diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 6e3dd3d51e..c90845ae88 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -175,37 +175,44 @@ public: // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing. static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, - Type type = Type::QUERY); + RuntimeProfile* profile, Type type = Type::QUERY); template <typename TrackerGroups> static int64_t free_top_memory_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, - const std::function<std::string(int64_t, const std::string&)>& cancel_msg); + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile); static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str) { - return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); + const std::string& mem_available_str, + RuntimeProfile* profile) { + return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, profile, + Type::LOAD); } // Start canceling from the query with the largest memory overcommit ratio until the memory // of min_free_mem size is freed. static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, - Type type = Type::QUERY); + RuntimeProfile* profile, Type type = Type::QUERY); template <typename TrackerGroups> static int64_t free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, - const std::function<std::string(int64_t, const std::string&)>& cancel_msg); + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile); static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, - const std::string& mem_available_str) { - return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); + const std::string& mem_available_str, + RuntimeProfile* profile) { + return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, profile, + Type::LOAD); } static int64_t tg_memory_limit_gc( int64_t request_free_memory, int64_t used_memory, uint64_t id, const std::string& name, int64_t memory_limit, - std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups); + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups, + RuntimeProfile* profile); // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index deafcdc241..ca4334ad45 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -173,7 +173,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) { } // Large memory alloc should use allocator.h // Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM. - if (size > 1024l * 1024 * 1024) { // 1G + if (size > 1024l * 1024 * 1024 && !doris::config::disable_memory_gc) { // 1G _stop_consume = true; LOG(WARNING) << fmt::format("MemHook alloc large memory: {}, stacktrace:\n{}", size, get_stack_trace()); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 97125d0942..29c58ca23e 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -38,9 +38,8 @@ #include "common/config.h" #include "common/status.h" #include "gutil/strings/split.h" -#include "olap/page_cache.h" -#include "olap/rowset/segment_v2/inverted_index_cache.h" -#include "olap/segment_loader.h" +#include "runtime/exec_env.h" +#include "runtime/memory/cache_manager.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group.h" #include "runtime/task_group/task_group_manager.h" @@ -48,6 +47,7 @@ #include "util/defer_op.h" #include "util/parse_util.h" #include "util/pretty_printer.h" +#include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "util/string_parser.hpp" @@ -104,34 +104,6 @@ void MemInfo::refresh_allocator_mem() { #endif } -void MemInfo::process_cache_gc(int64_t& freed_mem) { - // TODO, free more cache, and should free a certain percentage of capacity, not all. - int32_t min_free_size = 33554432; // 32M - - if (StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE) > - min_free_size) { - freed_mem += - StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); - StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); - } - - if (segment_v2::InvertedIndexSearcherCache::instance()->mem_consumption() > min_free_size) { - freed_mem += segment_v2::InvertedIndexSearcherCache::instance()->prune(); - } - - if (segment_v2::InvertedIndexQueryCache::instance()->mem_consumption() > min_free_size) { - freed_mem += segment_v2::InvertedIndexQueryCache::instance()->prune(); - } - - if (StoragePageCache::instance()->get_page_cache_mem_consumption( - segment_v2::PRIMARY_KEY_INDEX_PAGE) > min_free_size) { - freed_mem += StoragePageCache::instance()->get_page_cache_mem_consumption( - segment_v2::PRIMARY_KEY_INDEX_PAGE); - StoragePageCache::instance()->prune(segment_v2::PRIMARY_KEY_INDEX_PAGE); - } - je_purge_all_arena_dirty_pages(); -} - // step1: free all cache // step2: free resource groups memory that enable overcommit // step3: free global top overcommit query, if enable query memory overcommit @@ -140,36 +112,42 @@ bool MemInfo::process_minor_gc() { MonotonicStopWatch watch; watch.start(); int64_t freed_mem = 0; - std::string vm_rss_str = PerfCounters::get_vm_rss_str(); - std::string mem_available_str = MemInfo::sys_mem_available_str(); + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>(""); + std::string pre_vm_rss = PerfCounters::get_vm_rss_str(); + std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { je_purge_all_arena_dirty_pages(); - LOG(INFO) << fmt::format("End Minor GC, Free Memory {} Bytes. cost(us): {}", freed_mem, - watch.elapsed_time() / 1000); + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format("End Minor GC, Free Memory {} Bytes. cost(us): {}, details: {}", + freed_mem, watch.elapsed_time() / 1000, ss.str()); }}; - MemInfo::process_cache_gc(freed_mem); + freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); + je_purge_all_arena_dirty_pages(); if (freed_mem > _s_process_minor_gc_size) { return true; } - // TODO add freed_mem - SegmentLoader::instance()->prune(); - - freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem); + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem, tg_profile); if (freed_mem > _s_process_minor_gc_size) { return true; } - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "Before free top memory overcommit query in Minor GC", MemTrackerLimiter::Type::QUERY); if (config::enable_query_memory_overcommit) { + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "Before free top memory overcommit query in Minor GC", + MemTrackerLimiter::Type::QUERY); + RuntimeProfile* toq_profile = + profile->create_child("FreeTopOvercommitMemoryQuery", true, true); freed_mem += MemTrackerLimiter::free_top_overcommit_query( - _s_process_minor_gc_size - freed_mem, vm_rss_str, mem_available_str); - } - if (freed_mem > _s_process_minor_gc_size) { - return true; + _s_process_minor_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, + toq_profile); + if (freed_mem > _s_process_minor_gc_size) { + return true; + } } return false; } @@ -183,48 +161,47 @@ bool MemInfo::process_full_gc() { MonotonicStopWatch watch; watch.start(); int64_t freed_mem = 0; - std::string vm_rss_str = PerfCounters::get_vm_rss_str(); - std::string mem_available_str = MemInfo::sys_mem_available_str(); + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>(""); + std::string pre_vm_rss = PerfCounters::get_vm_rss_str(); + std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { je_purge_all_arena_dirty_pages(); - LOG(INFO) << fmt::format("End Full GC Free, Memory {} Bytes. cost(us): {}", freed_mem, - watch.elapsed_time() / 1000); + std::stringstream ss; + profile->pretty_print(&ss); + LOG(INFO) << fmt::format("End Full GC Free, Memory {} Bytes. cost(us): {}, details: {}", + freed_mem, watch.elapsed_time() / 1000, ss.str()); }}; - MemInfo::process_cache_gc(freed_mem); + freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); + je_purge_all_arena_dirty_pages(); if (freed_mem > _s_process_full_gc_size) { return true; } - if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) { - freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption(); - LOG(INFO) << "prune all " << SegmentLoader::instance()->segment_cache_get_usage() - << " entries in segment cache."; - SegmentLoader::instance()->prune_all(); - if (freed_mem > _s_process_full_gc_size) { - return true; - } - } - - freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem); + RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true); + freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem, tg_profile); if (freed_mem > _s_process_full_gc_size) { return true; } VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory query in Full GC", MemTrackerLimiter::Type::QUERY); - freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem, - vm_rss_str, mem_available_str); + RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true); + freed_mem += MemTrackerLimiter::free_top_memory_query( + _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tmq_profile); if (freed_mem > _s_process_full_gc_size) { return true; } - VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( - "Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD); if (config::enable_query_memory_overcommit) { + VLOG_NOTICE << MemTrackerLimiter::type_detail_usage( + "Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD); + RuntimeProfile* tol_profile = + profile->create_child("FreeTopMemoryOvercommitLoad", true, true); freed_mem += MemTrackerLimiter::free_top_overcommit_load( - _s_process_full_gc_size - freed_mem, vm_rss_str, mem_available_str); + _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, + tol_profile); if (freed_mem > _s_process_full_gc_size) { return true; } @@ -232,8 +209,9 @@ bool MemInfo::process_full_gc() { VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory load in Full GC", MemTrackerLimiter::Type::LOAD); - freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem, - vm_rss_str, mem_available_str); + RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true); + freed_mem += MemTrackerLimiter::free_top_memory_load( + _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tml_profile); if (freed_mem > _s_process_full_gc_size) { return true; } @@ -256,12 +234,12 @@ int64_t MemInfo::tg_hard_memory_limit_gc() { auto used = task_group->memory_used(); total_free_memory += MemTrackerLimiter::tg_memory_limit_gc( used - tg_info.memory_limit, used, tg_info.id, tg_info.name, tg_info.memory_limit, - task_group->mem_tracker_limiter_pool()); + task_group->mem_tracker_limiter_pool(), nullptr); } return total_free_memory; } -int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) { +int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile) { std::vector<taskgroup::TaskGroupPtr> task_groups; ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { @@ -298,7 +276,7 @@ int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) { task_group->task_group_info(&tg_info); total_free_memory += MemTrackerLimiter::tg_memory_limit_gc( tg_need_free_memory, used_memorys[i], tg_info.id, tg_info.name, - tg_info.memory_limit, task_group->mem_tracker_limiter_pool()); + tg_info.memory_limit, task_group->mem_tracker_limiter_pool(), profile); } return total_free_memory; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 542e0fd430..c83068befd 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -42,6 +42,8 @@ namespace doris { +class RuntimeProfile; + // Provides the amount of physical memory available. // Populated from /proc/meminfo. // TODO: Combine mem-info, cpu-info and disk-info into hardware-info/perf_counters ? @@ -165,13 +167,11 @@ public: static std::string debug_string(); - static void process_cache_gc(int64_t& freed_mem); static bool process_minor_gc(); static bool process_full_gc(); static int64_t tg_hard_memory_limit_gc(); - - static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory); + static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile); // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process, // avoid multiple threads starting at the same time and causing OOM. diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index d23418625a..89f126ae6f 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -24,6 +24,7 @@ namespace doris { // A common object cache depends on an Sharded LRU Cache. // It has a certain capacity, which determin how many objects it can cache. // Caller must hold a CacheHandle instance when visiting the cached object. +// TODO shouble add gc prune class ObjLRUCache { public: struct ObjKey { diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index d15552824e..9167c7df9f 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -49,7 +49,8 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); - if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc) { // 1G + if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc && + !doris::config::disable_memory_gc) { // 1G err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); } diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index ae8f0a7fe1..ab3b0b1ea0 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -38,6 +38,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); + doris::CacheManager::create_global_instance(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10, 0); doris::SegmentLoader::create_global_instance(1000); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org