This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b194a7cf83 [improvement](memory) Support GC segment cache, when memory insufficient (#16987) b194a7cf83 is described below commit b194a7cf8324fdb551d51fc0519c56df5888c17e Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Feb 22 18:31:20 2023 +0800 [improvement](memory) Support GC segment cache, when memory insufficient (#16987) fix segment cache memory tracker statistics support GC --- be/src/olap/lru_cache.cpp | 11 +++++++---- be/src/olap/lru_cache.h | 11 +++++++---- be/src/olap/rowset/segment_v2/segment.cpp | 2 ++ be/src/olap/rowset/segment_v2/segment.h | 2 ++ be/src/olap/segment_loader.cpp | 10 ++++++++-- be/src/olap/segment_loader.h | 2 ++ be/src/util/mem_info.cpp | 8 ++++++++ be/test/olap/lru_cache_test.cpp | 18 +++++++++--------- 8 files changed, 45 insertions(+), 19 deletions(-) diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 11f504f241..00dff82d1f 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -351,7 +351,7 @@ bool LRUCache::_check_element_count_limit() { Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - MemTrackerLimiter* tracker, CachePriority priority) { + MemTrackerLimiter* tracker, CachePriority priority, size_t bytes) { size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size)); e->value = value; @@ -359,16 +359,19 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, e->charge = charge; e->key_length = key.size(); e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : 1); + DCHECK(_type == LRUCacheType::SIZE || bytes != -1) << " _type " << _type; + e->bytes = (_type == LRUCacheType::SIZE ? handle_size + charge : handle_size + bytes); e->hash = hash; e->refs = 2; // one for the returned handle, one for LRUCache. e->next = e->prev = nullptr; e->in_cache = true; e->priority = priority; e->mem_tracker = tracker; + e->type = _type; memcpy(e->key_data, key.data(), key.size()); // The memory of the parameter value should be recorded in the tls mem tracker, // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker. - THREAD_MEM_TRACKER_TRANSFER_TO(e->total_size, tracker); + THREAD_MEM_TRACKER_TRANSFER_TO(e->bytes, tracker); LRUHandle* to_remove_head = nullptr; { std::lock_guard l(_mutex); @@ -568,10 +571,10 @@ ShardedLRUCache::~ShardedLRUCache() { Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - CachePriority priority) { + CachePriority priority, size_t bytes) { const uint32_t hash = _hash_slice(key); return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, _mem_tracker.get(), - priority); + priority, bytes); } Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index e988342cab..50749db662 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -175,7 +175,7 @@ public: // value will be passed to "deleter". virtual Handle* insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL) = 0; + CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) = 0; // If the cache has no mapping for "key", returns nullptr. // @@ -240,12 +240,14 @@ typedef struct LRUHandle { size_t charge; size_t key_length; size_t total_size; // including key length + size_t bytes; // Used by LRUCacheType::NUMBER, LRUCacheType::SIZE equal to total_size. bool in_cache; // Whether entry is in the cache. uint32_t refs; uint32_t hash; // Hash of key(); used for fast sharding and comparisons CachePriority priority = CachePriority::NORMAL; MemTrackerLimiter* mem_tracker; char key_data[1]; // Beginning of key + LRUCacheType type; CacheKey key() const { // For cheaper lookups, we allow a temporary Handle object @@ -259,7 +261,7 @@ typedef struct LRUHandle { void free() { (*deleter)(key(), value); - THREAD_MEM_TRACKER_TRANSFER_FROM(total_size, mem_tracker); + THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker); ::free(this); } @@ -332,7 +334,7 @@ public: Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), MemTrackerLimiter* tracker, - CachePriority priority = CachePriority::NORMAL); + CachePriority priority = CachePriority::NORMAL, size_t bytes = -1); Cache::Handle* lookup(const CacheKey& key, uint32_t hash); void release(Cache::Handle* handle); void erase(const CacheKey& key, uint32_t hash); @@ -398,7 +400,8 @@ public: virtual ~ShardedLRUCache(); virtual Handle* insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value), - CachePriority priority = CachePriority::NORMAL) override; + CachePriority priority = CachePriority::NORMAL, + size_t bytes = -1) override; virtual Handle* lookup(const CacheKey& key) override; virtual void release(Handle* handle) override; virtual void erase(const CacheKey& key) override; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 09e294496c..c0f4d7fa89 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -198,6 +198,7 @@ Status Segment::_load_pk_bloom_filter() { return _load_pk_bf_once.call([this] { RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, _footer.primary_key_index_meta())); _meta_mem_usage += _pk_index_reader->get_bf_memory_size(); + _segment_meta_mem_tracker->consume(_pk_index_reader->get_bf_memory_size()); return Status::OK(); }); } @@ -214,6 +215,7 @@ Status Segment::load_index() { RETURN_IF_ERROR( _pk_index_reader->parse_index(_file_reader, _footer.primary_key_index_meta())); _meta_mem_usage += _pk_index_reader->get_memory_size(); + _segment_meta_mem_tracker->consume(_pk_index_reader->get_memory_size()); return Status::OK(); } else { // read and parse short key index page diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 47e5042f7b..c8414566fc 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -116,6 +116,8 @@ public: io::FileReaderSPtr file_reader() { return _file_reader; } + int64_t meta_mem_usage() const { return _meta_mem_usage; } + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema); diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index a83143e6f9..3e604adf17 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -32,7 +32,8 @@ void SegmentLoader::create_global_instance(size_t capacity) { } SegmentLoader::SegmentLoader(size_t capacity) { - _cache = std::unique_ptr<Cache>(new_lru_cache("SegmentCache", capacity, LRUCacheType::NUMBER)); + _cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentMetaCache", capacity, LRUCacheType::NUMBER)); } bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* handle) { @@ -52,8 +53,13 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::C delete cache_value; }; + int64_t meta_mem_usage = 0; + for (auto segment : value.segments) { + meta_mem_usage += segment->meta_mem_usage(); + } + auto lru_handle = _cache->insert(key.encode(), &value, sizeof(SegmentLoader::CacheValue), - deleter, CachePriority::NORMAL); + deleter, CachePriority::NORMAL, meta_mem_usage); *handle = SegmentCacheHandle(_cache.get(), lru_handle); } diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index d2da517d4c..8535e69281 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -87,6 +87,8 @@ public: // Try to prune the segment cache if expired. Status prune(); + int64_t prune_all() { return _cache->prune(); }; + int64_t segment_cache_mem_consumption() { return _cache->mem_consumption(); } private: SegmentLoader(); diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 4ba9883eaa..8c995b3b31 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -35,6 +35,7 @@ #include "common/config.h" #include "gutil/strings/split.h" #include "olap/page_cache.h" +#include "olap/segment_loader.h" #include "util/cgroup_util.h" #include "util/parse_util.h" #include "util/pretty_printer.h" @@ -97,6 +98,8 @@ void MemInfo::process_cache_gc(int64_t& freed_mem) { freed_mem += StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + // TODO add freed_mem + SegmentLoader::instance()->prune(); } // step1: free all cache @@ -134,6 +137,11 @@ bool MemInfo::process_full_gc() { if (freed_mem > _s_process_full_gc_size) { return true; } + freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption(); + SegmentLoader::instance()->prune_all(); + if (freed_mem > _s_process_full_gc_size) { + return true; + } freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); if (freed_mem > _s_process_full_gc_size) { return true; diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp index bafe4e7288..c779996c54 100644 --- a/be/test/olap/lru_cache_test.cpp +++ b/be/test/olap/lru_cache_test.cpp @@ -224,7 +224,7 @@ static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value, static std::unique_ptr<MemTrackerLimiter> lru_cache_tracker = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "TestLruCache"); cache.release(cache.insert(key, hash, EncodeValue(value), value, &deleter, - lru_cache_tracker.get(), priority)); + lru_cache_tracker.get(), priority, value)); } TEST_F(CacheTest, Usage) { @@ -232,34 +232,34 @@ TEST_F(CacheTest, Usage) { cache.set_capacity(1040); // The lru usage is handle_size + charge. - // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98 + // handle_size = sizeof(handle) - 1 + key size = 104 - 1 + 3 = 106 CacheKey key1("100"); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); - ASSERT_EQ(198, cache.get_usage()); // 100 + 98 + ASSERT_EQ(206, cache.get_usage()); // 100 + 106 CacheKey key2("200"); insert_LRUCache(cache, key2, 200, CachePriority::DURABLE); - ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE + ASSERT_EQ(512, cache.get_usage()); // 206 + 306(d), d = DURABLE CacheKey key3("300"); insert_LRUCache(cache, key3, 300, CachePriority::NORMAL); - ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398 + ASSERT_EQ(918, cache.get_usage()); // 206 + 306(d) + 406 CacheKey key4("400"); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398 + ASSERT_EQ(812, cache.get_usage()); // 306(d) + 506, evict 206 406 CacheKey key5("500"); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498 + ASSERT_EQ(912, cache.get_usage()); // 306(d) + 606, evict 506 CacheKey key6("600"); insert_LRUCache(cache, key6, 600, CachePriority::NORMAL); - ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 498 + ASSERT_EQ(1012, cache.get_usage()); // 306(d) + 706, evict 506 CacheKey key7("950"); insert_LRUCache(cache, key7, 950, CachePriority::DURABLE); - ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, so insert failed + ASSERT_EQ(0, cache.get_usage()); // evict 306 706, because 950 + 106 > 1040, so insert failed } TEST_F(CacheTest, Prune) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org