This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 20c2dd08c93 [enhancement](cloud) file cache evict in advance (#47473) 20c2dd08c93 is described below commit 20c2dd08c9392cd0e9c59897718718f052e6bcee Author: zhengyu <zhangzhen...@selectdb.com> AuthorDate: Fri Feb 7 22:56:46 2025 +0800 [enhancement](cloud) file cache evict in advance (#47473) evict in advance if current cache size is over threshold to avoid sync evict during query, which may affect query performance. --- be/src/common/config.cpp | 6 + be/src/common/config.h | 5 + be/src/io/cache/block_file_cache.cpp | 146 +++++++++++++++-- be/src/io/cache/block_file_cache.h | 37 ++++- be/test/io/cache/block_file_cache_test.cpp | 243 ++++++++++++++++++++++++++++- 5 files changed, 416 insertions(+), 21 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a0123ab90b6..216ae0d76b7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1055,6 +1055,12 @@ DEFINE_Bool(clear_file_cache, "false"); DEFINE_Bool(enable_file_cache_query_limit, "false"); DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88"); DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80"); +DEFINE_mBool(enable_evict_file_cache_in_advance, "true"); +DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78"); +DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75"); +DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000"); +DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB + DEFINE_mBool(enable_read_cache_file_directly, "false"); DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true"); // If true, evict the ttl cache using LRU when full. diff --git a/be/src/common/config.h b/be/src/common/config.h index 0a6d0c0980c..db1cad729c8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1095,6 +1095,11 @@ DECLARE_Bool(clear_file_cache); DECLARE_Bool(enable_file_cache_query_limit); DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent); DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent); +DECLARE_mBool(enable_evict_file_cache_in_advance); +DECLARE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent); +DECLARE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent); +DECLARE_mInt32(file_cache_evict_in_advance_interval_ms); +DECLARE_mInt64(file_cache_evict_in_advance_batch_bytes); DECLARE_mBool(enable_read_cache_file_directly); DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size); // If true, evict the ttl cache using LRU when full. diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 693cd49bb80..f70bd7d1beb 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, "file_cache_hit_ratio_1h", 0.0); _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>( _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0); + _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>( + _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0); _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>( _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us"); @@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us"); _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>( _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us"); + _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>( + _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); + + _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>( + _cache_base_path.c_str(), "file_cache_recycle_keys_length"); _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, cache_settings.disposable_queue_elements, 60 * 60); @@ -339,6 +346,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); _cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this); _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this); + _cache_background_evict_in_advance_thread = + std::thread(&BlockFileCache::run_background_evict_in_advance, this); return Status::OK(); } @@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& return true; } +void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) { + UInt128Wrapper hash = UInt128Wrapper(); + size_t offset = 0; + CacheContext context; + context.cache_type = FileCacheType::NORMAL; + try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false); + context.cache_type = FileCacheType::TTL; + try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, false); +} + bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool remove_directly, std::lock_guard<std::mutex>& cache_lock, bool sync) { auto& ttl_queue = get_queue(FileCacheType::TTL); @@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size bool BlockFileCache::try_reserve_from_other_queue_by_time_interval( FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, - int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) { + int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool sync_removal) { size_t removed_size = 0; size_t cur_cache_size = _cur_cache_size; std::vector<FileBlockCell*> to_evict; @@ -1211,7 +1230,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval( } *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type; } - remove_file_blocks(to_evict, cache_lock, true); + remove_file_blocks(to_evict, cache_lock, sync_removal); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, bool BlockFileCache::try_reserve_from_other_queue_by_size( FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, - std::lock_guard<std::mutex>& cache_lock) { + std::lock_guard<std::mutex>& cache_lock, bool sync_removal) { size_t removed_size = 0; size_t cur_cache_size = _cur_cache_size; std::vector<FileBlockCell*> to_evict; @@ -1249,17 +1268,18 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size( cur_removed_size); *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size; } - remove_file_blocks(to_evict, cache_lock, true); + remove_file_blocks(to_evict, cache_lock, sync_removal); return !is_overflow(removed_size, size, cur_cache_size); } bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size, int64_t cur_time, - std::lock_guard<std::mutex>& cache_lock) { + std::lock_guard<std::mutex>& cache_lock, + bool sync_removal) { // currently, TTL cache is not considered as a candidate auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type); bool reserve_success = try_reserve_from_other_queue_by_time_interval( - cur_cache_type, other_cache_types, size, cur_time, cache_lock); + cur_cache_type, other_cache_types, size, cur_time, cache_lock, sync_removal); if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) { return reserve_success; } @@ -1272,14 +1292,15 @@ bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) { return false; } - return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, - cache_lock); + return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock, + sync_removal); } bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, QueryFileCacheContextPtr query_context, const CacheContext& context, size_t offset, size_t size, - std::lock_guard<std::mutex>& cache_lock) { + std::lock_guard<std::mutex>& cache_lock, + bool sync_removal) { int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); @@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, size_t cur_removed_size = 0; find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock, cur_removed_size); - remove_file_blocks(to_evict, cache_lock, true); + remove_file_blocks(to_evict, cache_lock, sync_removal); *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size; if (is_overflow(removed_size, size, cur_cache_size)) { @@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo // so there will be a window that the file is not in the cache but still in the storage // but it's ok, because the rowset is stale already bool ret = _recycle_keys.enqueue(key); - if (!ret) { + if (ret) [[likely]] { + *_recycle_keys_length_recorder << _recycle_keys.size_approx(); + } else { LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); int64_t duration_ns = 0; Status st; @@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) int inode_percentage = int(inode_free * 1.0 / inode_total * 100); percent->first = capacity_percentage; percent->second = 100 - inode_percentage; + + // Add sync point for testing + TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent); + return 0; } @@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() { LOG_WARNING("config error, set to default value") .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent) .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent); - config::file_cache_enter_disk_resource_limit_mode_percent = 90; + config::file_cache_enter_disk_resource_limit_mode_percent = 88; config::file_cache_exit_disk_resource_limit_mode_percent = 80; } if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)) { @@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() { } } +void BlockFileCache::check_need_evict_cache_in_advance() { + if (_storage->get_type() != FileCacheStorageType::DISK) { + return; + } + + std::pair<int, int> percent; + int ret = disk_used_percentage(_cache_base_path, &percent); + if (ret != 0) { + LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno)); + return; + } + auto [space_percentage, inode_percentage] = percent; + size_t size_percentage = static_cast<size_t>( + (static_cast<double>(_cur_cache_size) / static_cast<double>(_capacity)) * 100); + auto is_insufficient = [](const int& percentage) { + return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent; + }; + DCHECK_GE(space_percentage, 0); + DCHECK_LE(space_percentage, 100); + DCHECK_GE(inode_percentage, 0); + DCHECK_LE(inode_percentage, 100); + // ATTN: due to that can be changed dynamically, set it to default value if it's invalid + // FIXME: reject with config validator + if (config::file_cache_enter_need_evict_cache_in_advance_percent <= + config::file_cache_exit_need_evict_cache_in_advance_percent) { + LOG_WARNING("config error, set to default value") + .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent) + .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent); + config::file_cache_enter_need_evict_cache_in_advance_percent = 78; + config::file_cache_exit_need_evict_cache_in_advance_percent = 75; + } + if (is_insufficient(space_percentage) || is_insufficient(inode_percentage) || + is_insufficient(size_percentage)) { + _need_evict_cache_in_advance = true; + _need_evict_cache_in_advance_metrics->set_value(1); + } else if (_need_evict_cache_in_advance && + (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && + (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) && + (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) { + _need_evict_cache_in_advance = false; + _need_evict_cache_in_advance_metrics->set_value(0); + } + if (_need_evict_cache_in_advance) { + LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" << space_percentage + << " inode_percent=" << inode_percentage << " size_percent=" << size_percentage + << " is_space_insufficient=" << is_insufficient(space_percentage) + << " is_inode_insufficient=" << is_insufficient(inode_percentage) + << " is_size_insufficient=" << is_insufficient(size_percentage) + << " need evict cache in advance"; + } +} + void BlockFileCache::run_background_monitor() { int64_t interval_time_seconds = 20; while (!_close) { TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds); check_disk_resource_limit(); + if (config::enable_evict_file_cache_in_advance) { + check_need_evict_cache_in_advance(); + } else { + _need_evict_cache_in_advance = false; + } + { std::unique_lock close_lock(_close_mtx); _close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds)); @@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() { break; } } - while (_recycle_keys.try_dequeue(key)) { - if (batch_count >= batch_limit) { - break; - } + while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) { int64_t duration_ns = 0; Status st; { @@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() { } batch_count++; } + *_recycle_keys_length_recorder << _recycle_keys.size_approx(); batch_count = 0; } } +void BlockFileCache::run_background_evict_in_advance() { + LOG(INFO) << "Starting background evict in advance thread"; + int64_t batch = 0; + while (!_close) { + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for( + close_lock, + std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms)); + if (_close) { + LOG(INFO) << "Background evict in advance thread exiting due to cache closing"; + break; + } + } + batch = config::file_cache_evict_in_advance_batch_bytes; + + // Skip if eviction not needed or too many pending recycles + if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >= (batch * 10)) { + continue; + } + + int64_t duration_ns = 0; + { + SCOPED_CACHE_LOCK(_mutex, this); + SCOPED_RAW_TIMER(&duration_ns); + try_evict_in_advance(batch, cache_lock); + } + *_evict_in_advance_latency_us << (duration_ns / 1000); + } +} + void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, uint64_t new_expiration_time) { SCOPED_CACHE_LOCK(_mutex, this); diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index ce8d13d4a14..5b998708241 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -109,6 +109,9 @@ public: if (_cache_background_gc_thread.joinable()) { _cache_background_gc_thread.join(); } + if (_cache_background_evict_in_advance_thread.joinable()) { + _cache_background_evict_in_advance_thread.join(); + } } /// Restore cache from local filesystem. @@ -190,6 +193,22 @@ public: bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context, size_t offset, size_t size, std::lock_guard<std::mutex>& cache_lock); + /** + * Proactively evict cache blocks to free up space before cache is full. + * + * This function attempts to evict blocks from both NORMAL and TTL queues to maintain + * cache size below high watermark. Unlike try_reserve() which blocks until space is freed, + * this function initiates asynchronous eviction in background. + * + * @param size Number of bytes to try to evict + * @param cache_lock Lock that must be held while accessing cache data structures + * + * @pre Caller must hold cache_lock + * @pre _need_evict_cache_in_advance must be true + * @pre _recycle_keys queue must have capacity for evicted blocks + */ + void try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock); + void update_ttl_atime(const UInt128Wrapper& hash); std::map<std::string, double> get_stats(); @@ -395,7 +414,7 @@ private: bool try_reserve_for_lru(const UInt128Wrapper& hash, QueryFileCacheContextPtr query_context, const CacheContext& context, size_t offset, size_t size, - std::lock_guard<std::mutex>& cache_lock); + std::lock_guard<std::mutex>& cache_lock, bool sync_removal = true); bool try_reserve_during_async_load(size_t size, std::lock_guard<std::mutex>& cache_lock); @@ -403,7 +422,8 @@ private: std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType cur_cache_type); bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t offset, int64_t cur_time, - std::lock_guard<std::mutex>& cache_lock); + std::lock_guard<std::mutex>& cache_lock, + bool sync_removal = true); size_t get_available_cache_size(FileCacheType cache_type) const; @@ -426,6 +446,7 @@ private: std::lock_guard<std::mutex>& cache_lock) const; void check_disk_resource_limit(); + void check_need_evict_cache_in_advance(); size_t get_available_cache_size_unlocked(FileCacheType type, std::lock_guard<std::mutex>& cache_lock) const; @@ -441,15 +462,18 @@ private: void run_background_monitor(); void run_background_ttl_gc(); void run_background_gc(); + void run_background_evict_in_advance(); bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size, int64_t cur_time, - std::lock_guard<std::mutex>& cache_lock); + std::lock_guard<std::mutex>& cache_lock, + bool sync_removal); bool try_reserve_from_other_queue_by_size(FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, - size_t size, std::lock_guard<std::mutex>& cache_lock); + size_t size, std::lock_guard<std::mutex>& cache_lock, + bool sync_removal); bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const; @@ -476,9 +500,11 @@ private: std::thread _cache_background_monitor_thread; std::thread _cache_background_ttl_gc_thread; std::thread _cache_background_gc_thread; + std::thread _cache_background_evict_in_advance_thread; std::atomic_bool _async_open_done {false}; // disk space or inode is less than the specified value bool _disk_resource_limit_mode {false}; + bool _need_evict_cache_in_advance {false}; bool _is_initialized {false}; // strategy @@ -536,12 +562,15 @@ private: std::shared_ptr<bvar::Status<double>> _hit_ratio_5m; std::shared_ptr<bvar::Status<double>> _hit_ratio_1h; std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics; + std::shared_ptr<bvar::Status<size_t>> _need_evict_cache_in_advance_metrics; std::shared_ptr<bvar::LatencyRecorder> _cache_lock_wait_time_us; std::shared_ptr<bvar::LatencyRecorder> _get_or_set_latency_us; std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency_us; std::shared_ptr<bvar::LatencyRecorder> _storage_retry_sync_remove_latency_us; std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us; + std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us; + std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder; }; } // namespace doris::io diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 117f01d63e3..e42b6516d58 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -139,6 +139,10 @@ class BlockFileCacheTest : public testing::Test { public: static void SetUpTestSuite() { config::file_cache_enter_disk_resource_limit_mode_percent = 99; + config::enable_evict_file_cache_in_advance = false; // disable evict in + // advance for most + // cases for simple + // verification bool exists {false}; ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok()); if (!exists) { @@ -4402,7 +4406,7 @@ TEST_F(BlockFileCacheTest, test_check_disk_reource_limit_1) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } std::this_thread::sleep_for(std::chrono::milliseconds(10)); - EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 90); + EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 88); EXPECT_EQ(config::file_cache_exit_disk_resource_limit_mode_percent, 80); config::file_cache_enter_disk_resource_limit_mode_percent = 99; if (fs::exists(cache_base_path)) { @@ -6742,4 +6746,241 @@ TEST_F(BlockFileCacheTest, evict_privilege_order_for_ttl) { } } +TEST_F(BlockFileCacheTest, evict_in_advance) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + auto sp = SyncPoint::get_instance(); + SyncPoint::CallbackGuard guard1; + sp->set_call_back( + "BlockFileCache::set_sleep_time", + [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }, &guard1); + sp->enable_processing(); + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 3000000; + settings.query_queue_elements = 30000; + settings.index_queue_size = 1000000; + settings.index_queue_elements = 10000; + settings.disposable_queue_size = 1000000; + settings.disposable_queue_elements = 10000; + settings.capacity = 10000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + size_t limit = 1000000; + size_t cache_max = 10000000; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.cache_type = io::FileCacheType::NORMAL; + context.query_id = query_id; + // int64_t cur_time = UnixSeconds(); + // context.expiration_time = cur_time + 120; + auto key1 = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache.get_async_open_success()); + int64_t offset = 0; + // fill the cache to its limit + for (; offset < limit; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + // grab more exceed the limit to max cache capacity + for (; offset < cache_max; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(), 0); + + // grab more exceed the cache capacity + size_t exceed = 2000000; + for (; offset < (cache_max + exceed); offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max); + + config::file_cache_evict_in_advance_batch_bytes = 200000; // evict 2 100000 blocks + config::enable_evict_file_cache_in_advance = true; // enable evict in advance + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // wait for clear + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - 200000); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, test_check_need_evict_cache_in_advance) { + std::string cache_base_path = "./ut_file_cache_dir"; + fs::create_directories(cache_base_path); + + io::FileCacheSettings settings; + settings.capacity = 100_mb; + settings.storage = "disk"; + settings.query_queue_size = 50_mb; + settings.index_queue_size = 20_mb; + settings.disposable_queue_size = 20_mb; + settings.ttl_queue_size = 10_mb; + + // this one for memory storage + { + settings.storage = "memory"; + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + cache.check_need_evict_cache_in_advance(); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + } + + // the rest for disk + settings.storage = "disk"; + + // bad disk path + { + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + + cache._cache_base_path = "/non/existent/path/OOXXOO"; + cache.check_need_evict_cache_in_advance(); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + } + + // conditions for enter need evict cache in advance + { + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + + // condition1 space usage rate exceed threshold + config::file_cache_enter_need_evict_cache_in_advance_percent = 70; + config::file_cache_exit_need_evict_cache_in_advance_percent = 65; + + SyncPoint::get_instance()->set_call_back( + "BlockFileCache::disk_used_percentage:1", [](std::vector<std::any>&& values) { + auto* percent = try_any_cast<std::pair<int, int>*>(values.back()); + percent->first = 75; // set high + percent->second = 60; + }); + + SyncPoint::get_instance()->enable_processing(); + cache.check_need_evict_cache_in_advance(); + ASSERT_TRUE(cache._need_evict_cache_in_advance); + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + + // condition2 inode usage rate exceed threshold + cache._need_evict_cache_in_advance = false; + + SyncPoint::get_instance()->set_call_back( + "BlockFileCache::disk_used_percentage:1", [](std::vector<std::any>&& values) { + auto* percent = try_any_cast<std::pair<int, int>*>(values.back()); + percent->first = 60; + percent->second = 75; // set high + }); + + SyncPoint::get_instance()->enable_processing(); + cache.check_need_evict_cache_in_advance(); + ASSERT_TRUE(cache._need_evict_cache_in_advance); + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + + // condition3 cache size usage rate exceed threshold + cache._need_evict_cache_in_advance = false; + cache._cur_cache_size = 80_mb; // set high + cache.check_need_evict_cache_in_advance(); + ASSERT_TRUE(cache._need_evict_cache_in_advance); + } + + // conditions for exit need evict cache in advance + { + io::BlockFileCache cache(cache_base_path, settings); + cache._need_evict_cache_in_advance = true; + cache._cur_cache_size = 50_mb; // set low + + SyncPoint::get_instance()->set_call_back( + "BlockFileCache::disk_used_percentage:1", [](std::vector<std::any>&& values) { + auto* percent = try_any_cast<std::pair<int, int>*>(values.back()); + percent->first = 50; // set low + percent->second = 50; // set low + }); + + SyncPoint::get_instance()->enable_processing(); + cache.check_need_evict_cache_in_advance(); + ASSERT_FALSE(cache._need_evict_cache_in_advance); + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); + } + + // config parameter validation + { + io::BlockFileCache cache(cache_base_path, settings); + + // set wrong config value + config::file_cache_enter_need_evict_cache_in_advance_percent = 70; + config::file_cache_exit_need_evict_cache_in_advance_percent = 75; + + cache.check_need_evict_cache_in_advance(); + + // reset to default value + ASSERT_EQ(config::file_cache_enter_need_evict_cache_in_advance_percent, 78); + ASSERT_EQ(config::file_cache_exit_need_evict_cache_in_advance_percent, 75); + } + + fs::remove_all(cache_base_path); +} + } // namespace doris::io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org