This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c2dd08c93 [enhancement](cloud) file cache evict in advance (#47473)
20c2dd08c93 is described below

commit 20c2dd08c9392cd0e9c59897718718f052e6bcee
Author: zhengyu <zhangzhen...@selectdb.com>
AuthorDate: Fri Feb 7 22:56:46 2025 +0800

    [enhancement](cloud) file cache evict in advance (#47473)
    
    evict in advance if current cache size is over threshold to avoid sync
    evict during query, which may affect query performance.
---
 be/src/common/config.cpp                   |   6 +
 be/src/common/config.h                     |   5 +
 be/src/io/cache/block_file_cache.cpp       | 146 +++++++++++++++--
 be/src/io/cache/block_file_cache.h         |  37 ++++-
 be/test/io/cache/block_file_cache_test.cpp | 243 ++++++++++++++++++++++++++++-
 5 files changed, 416 insertions(+), 21 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a0123ab90b6..216ae0d76b7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1055,6 +1055,12 @@ DEFINE_Bool(clear_file_cache, "false");
 DEFINE_Bool(enable_file_cache_query_limit, "false");
 DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
+DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
+DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
+DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
+DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB
+
 DEFINE_mBool(enable_read_cache_file_directly, "false");
 DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
 // If true, evict the ttl cache using LRU when full.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0a6d0c0980c..db1cad729c8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1095,6 +1095,11 @@ DECLARE_Bool(clear_file_cache);
 DECLARE_Bool(enable_file_cache_query_limit);
 DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
 DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
+DECLARE_mBool(enable_evict_file_cache_in_advance);
+DECLARE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent);
+DECLARE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent);
+DECLARE_mInt32(file_cache_evict_in_advance_interval_ms);
+DECLARE_mInt64(file_cache_evict_in_advance_batch_bytes);
 DECLARE_mBool(enable_read_cache_file_directly);
 DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
 // If true, evict the ttl cache using LRU when full.
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 693cd49bb80..f70bd7d1beb 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                                                            
"file_cache_hit_ratio_1h", 0.0);
     _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
             _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
+    _need_evict_cache_in_advance_metrics = 
std::make_shared<bvar::Status<size_t>>(
+            _cache_base_path.c_str(), 
"file_cache_need_evict_cache_in_advance", 0);
 
     _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
@@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
             _cache_base_path.c_str(), 
"file_cache_storage_retry_sync_remove_latency_us");
     _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), 
"file_cache_storage_async_remove_latency_us");
+    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_evict_in_advance_latency_us");
+
+    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
 
     _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                  cache_settings.disposable_queue_elements, 60 
* 60);
@@ -339,6 +346,8 @@ Status 
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
     _cache_background_monitor_thread = 
std::thread(&BlockFileCache::run_background_monitor, this);
     _cache_background_ttl_gc_thread = 
std::thread(&BlockFileCache::run_background_ttl_gc, this);
     _cache_background_gc_thread = 
std::thread(&BlockFileCache::run_background_gc, this);
+    _cache_background_evict_in_advance_thread =
+            std::thread(&BlockFileCache::run_background_evict_in_advance, 
this);
 
     return Status::OK();
 }
@@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& 
hash, const CacheContext&
     return true;
 }
 
+void BlockFileCache::try_evict_in_advance(size_t size, 
std::lock_guard<std::mutex>& cache_lock) {
+    UInt128Wrapper hash = UInt128Wrapper();
+    size_t offset = 0;
+    CacheContext context;
+    context.cache_type = FileCacheType::NORMAL;
+    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, 
false);
+    context.cache_type = FileCacheType::TTL;
+    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, 
false);
+}
+
 bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, 
bool remove_directly,
                                                std::lock_guard<std::mutex>& 
cache_lock, bool sync) {
     auto& ttl_queue = get_queue(FileCacheType::TTL);
@@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& 
hash, size_t offset, size
 
 bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
         FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, 
size_t size,
-        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
+        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool 
sync_removal) {
     size_t removed_size = 0;
     size_t cur_cache_size = _cur_cache_size;
     std::vector<FileBlockCell*> to_evict;
@@ -1211,7 +1230,7 @@ bool 
BlockFileCache::try_reserve_from_other_queue_by_time_interval(
         }
         *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << 
remove_size_per_type;
     }
-    remove_file_blocks(to_evict, cache_lock, true);
+    remove_file_blocks(to_evict, cache_lock, sync_removal);
 
     return !is_overflow(removed_size, size, cur_cache_size);
 }
@@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size, 
size_t need_size,
 
 bool BlockFileCache::try_reserve_from_other_queue_by_size(
         FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, 
size_t size,
-        std::lock_guard<std::mutex>& cache_lock) {
+        std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
     size_t removed_size = 0;
     size_t cur_cache_size = _cur_cache_size;
     std::vector<FileBlockCell*> to_evict;
@@ -1249,17 +1268,18 @@ bool 
BlockFileCache::try_reserve_from_other_queue_by_size(
                               cur_removed_size);
         *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << 
cur_removed_size;
     }
-    remove_file_blocks(to_evict, cache_lock, true);
+    remove_file_blocks(to_evict, cache_lock, sync_removal);
     return !is_overflow(removed_size, size, cur_cache_size);
 }
 
 bool BlockFileCache::try_reserve_from_other_queue(FileCacheType 
cur_cache_type, size_t size,
                                                   int64_t cur_time,
-                                                  std::lock_guard<std::mutex>& 
cache_lock) {
+                                                  std::lock_guard<std::mutex>& 
cache_lock,
+                                                  bool sync_removal) {
     // currently, TTL cache is not considered as a candidate
     auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
     bool reserve_success = try_reserve_from_other_queue_by_time_interval(
-            cur_cache_type, other_cache_types, size, cur_time, cache_lock);
+            cur_cache_type, other_cache_types, size, cur_time, cache_lock, 
sync_removal);
     if (reserve_success || 
!config::file_cache_enable_evict_from_other_queue_by_size) {
         return reserve_success;
     }
@@ -1272,14 +1292,15 @@ bool 
BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
     if (_cur_cache_size + size > _capacity && cur_queue_size + size > 
cur_queue_max_size) {
         return false;
     }
-    return try_reserve_from_other_queue_by_size(cur_cache_type, 
other_cache_types, size,
-                                                cache_lock);
+    return try_reserve_from_other_queue_by_size(cur_cache_type, 
other_cache_types, size, cache_lock,
+                                                sync_removal);
 }
 
 bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
                                          QueryFileCacheContextPtr 
query_context,
                                          const CacheContext& context, size_t 
offset, size_t size,
-                                         std::lock_guard<std::mutex>& 
cache_lock) {
+                                         std::lock_guard<std::mutex>& 
cache_lock,
+                                         bool sync_removal) {
     int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
                                
std::chrono::steady_clock::now().time_since_epoch())
                                .count();
@@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const 
UInt128Wrapper& hash,
         size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
                               cur_removed_size);
-        remove_file_blocks(to_evict, cache_lock, true);
+        remove_file_blocks(to_evict, cache_lock, sync_removal);
         *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << 
cur_removed_size;
 
         if (is_overflow(removed_size, size, cur_cache_size)) {
@@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
             // so there will be a window that the file is not in the cache but 
still in the storage
             // but it's ok, because the rowset is stale already
             bool ret = _recycle_keys.enqueue(key);
-            if (!ret) {
+            if (ret) [[likely]] {
+                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
+            } else {
                 LOG_WARNING("Failed to push recycle key to queue, do it 
synchronously");
                 int64_t duration_ns = 0;
                 Status st;
@@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path, 
std::pair<int, int>* percent)
     int inode_percentage = int(inode_free * 1.0 / inode_total * 100);
     percent->first = capacity_percentage;
     percent->second = 100 - inode_percentage;
+
+    // Add sync point for testing
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", 
percent);
+
     return 0;
 }
 
@@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() {
         LOG_WARNING("config error, set to default value")
                 .tag("enter", 
config::file_cache_enter_disk_resource_limit_mode_percent)
                 .tag("exit", 
config::file_cache_exit_disk_resource_limit_mode_percent);
-        config::file_cache_enter_disk_resource_limit_mode_percent = 90;
+        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
         config::file_cache_exit_disk_resource_limit_mode_percent = 80;
     }
     if (is_insufficient(space_percentage) || 
is_insufficient(inode_percentage)) {
@@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() {
     }
 }
 
+void BlockFileCache::check_need_evict_cache_in_advance() {
+    if (_storage->get_type() != FileCacheStorageType::DISK) {
+        return;
+    }
+
+    std::pair<int, int> percent;
+    int ret = disk_used_percentage(_cache_base_path, &percent);
+    if (ret != 0) {
+        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", 
strerror(errno));
+        return;
+    }
+    auto [space_percentage, inode_percentage] = percent;
+    size_t size_percentage = static_cast<size_t>(
+            (static_cast<double>(_cur_cache_size) / 
static_cast<double>(_capacity)) * 100);
+    auto is_insufficient = [](const int& percentage) {
+        return percentage >= 
config::file_cache_enter_need_evict_cache_in_advance_percent;
+    };
+    DCHECK_GE(space_percentage, 0);
+    DCHECK_LE(space_percentage, 100);
+    DCHECK_GE(inode_percentage, 0);
+    DCHECK_LE(inode_percentage, 100);
+    // ATTN: due to that can be changed dynamically, set it to default value 
if it's invalid
+    // FIXME: reject with config validator
+    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
+        config::file_cache_exit_need_evict_cache_in_advance_percent) {
+        LOG_WARNING("config error, set to default value")
+                .tag("enter", 
config::file_cache_enter_need_evict_cache_in_advance_percent)
+                .tag("exit", 
config::file_cache_exit_need_evict_cache_in_advance_percent);
+        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
+        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
+    }
+    if (is_insufficient(space_percentage) || is_insufficient(inode_percentage) 
||
+        is_insufficient(size_percentage)) {
+        _need_evict_cache_in_advance = true;
+        _need_evict_cache_in_advance_metrics->set_value(1);
+    } else if (_need_evict_cache_in_advance &&
+               (space_percentage < 
config::file_cache_exit_need_evict_cache_in_advance_percent) &&
+               (inode_percentage < 
config::file_cache_exit_need_evict_cache_in_advance_percent) &&
+               (size_percentage < 
config::file_cache_exit_need_evict_cache_in_advance_percent)) {
+        _need_evict_cache_in_advance = false;
+        _need_evict_cache_in_advance_metrics->set_value(0);
+    }
+    if (_need_evict_cache_in_advance) {
+        LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent=" 
<< space_percentage
+                     << " inode_percent=" << inode_percentage << " 
size_percent=" << size_percentage
+                     << " is_space_insufficient=" << 
is_insufficient(space_percentage)
+                     << " is_inode_insufficient=" << 
is_insufficient(inode_percentage)
+                     << " is_size_insufficient=" << 
is_insufficient(size_percentage)
+                     << " need evict cache in advance";
+    }
+}
+
 void BlockFileCache::run_background_monitor() {
     int64_t interval_time_seconds = 20;
     while (!_close) {
         TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", 
&interval_time_seconds);
         check_disk_resource_limit();
+        if (config::enable_evict_file_cache_in_advance) {
+            check_need_evict_cache_in_advance();
+        } else {
+            _need_evict_cache_in_advance = false;
+        }
+
         {
             std::unique_lock close_lock(_close_mtx);
             _close_cv.wait_for(close_lock, 
std::chrono::seconds(interval_time_seconds));
@@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() {
                 break;
             }
         }
-        while (_recycle_keys.try_dequeue(key)) {
-            if (batch_count >= batch_limit) {
-                break;
-            }
 
+        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
             int64_t duration_ns = 0;
             Status st;
             {
@@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() {
             }
             batch_count++;
         }
+        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
         batch_count = 0;
     }
 }
 
+void BlockFileCache::run_background_evict_in_advance() {
+    LOG(INFO) << "Starting background evict in advance thread";
+    int64_t batch = 0;
+    while (!_close) {
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(
+                    close_lock,
+                    
std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
+            if (_close) {
+                LOG(INFO) << "Background evict in advance thread exiting due 
to cache closing";
+                break;
+            }
+        }
+        batch = config::file_cache_evict_in_advance_batch_bytes;
+
+        // Skip if eviction not needed or too many pending recycles
+        if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >= 
(batch * 10)) {
+            continue;
+        }
+
+        int64_t duration_ns = 0;
+        {
+            SCOPED_CACHE_LOCK(_mutex, this);
+            SCOPED_RAW_TIMER(&duration_ns);
+            try_evict_in_advance(batch, cache_lock);
+        }
+        *_evict_in_advance_latency_us << (duration_ns / 1000);
+    }
+}
+
 void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
                                             uint64_t new_expiration_time) {
     SCOPED_CACHE_LOCK(_mutex, this);
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index ce8d13d4a14..5b998708241 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -109,6 +109,9 @@ public:
         if (_cache_background_gc_thread.joinable()) {
             _cache_background_gc_thread.join();
         }
+        if (_cache_background_evict_in_advance_thread.joinable()) {
+            _cache_background_evict_in_advance_thread.join();
+        }
     }
 
     /// Restore cache from local filesystem.
@@ -190,6 +193,22 @@ public:
     bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context, 
size_t offset,
                      size_t size, std::lock_guard<std::mutex>& cache_lock);
 
+    /**
+     * Proactively evict cache blocks to free up space before cache is full.
+     * 
+     * This function attempts to evict blocks from both NORMAL and TTL queues 
to maintain 
+     * cache size below high watermark. Unlike try_reserve() which blocks 
until space is freed,
+     * this function initiates asynchronous eviction in background.
+     * 
+     * @param size Number of bytes to try to evict
+     * @param cache_lock Lock that must be held while accessing cache data 
structures
+     * 
+     * @pre Caller must hold cache_lock
+     * @pre _need_evict_cache_in_advance must be true
+     * @pre _recycle_keys queue must have capacity for evicted blocks
+     */
+    void try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& 
cache_lock);
+
     void update_ttl_atime(const UInt128Wrapper& hash);
 
     std::map<std::string, double> get_stats();
@@ -395,7 +414,7 @@ private:
 
     bool try_reserve_for_lru(const UInt128Wrapper& hash, 
QueryFileCacheContextPtr query_context,
                              const CacheContext& context, size_t offset, 
size_t size,
-                             std::lock_guard<std::mutex>& cache_lock);
+                             std::lock_guard<std::mutex>& cache_lock, bool 
sync_removal = true);
 
     bool try_reserve_during_async_load(size_t size, 
std::lock_guard<std::mutex>& cache_lock);
 
@@ -403,7 +422,8 @@ private:
     std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType 
cur_cache_type);
 
     bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t 
offset, int64_t cur_time,
-                                      std::lock_guard<std::mutex>& cache_lock);
+                                      std::lock_guard<std::mutex>& cache_lock,
+                                      bool sync_removal = true);
 
     size_t get_available_cache_size(FileCacheType cache_type) const;
 
@@ -426,6 +446,7 @@ private:
                                         std::lock_guard<std::mutex>& 
cache_lock) const;
 
     void check_disk_resource_limit();
+    void check_need_evict_cache_in_advance();
 
     size_t get_available_cache_size_unlocked(FileCacheType type,
                                              std::lock_guard<std::mutex>& 
cache_lock) const;
@@ -441,15 +462,18 @@ private:
     void run_background_monitor();
     void run_background_ttl_gc();
     void run_background_gc();
+    void run_background_evict_in_advance();
 
     bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
                                                        
std::vector<FileCacheType> other_cache_types,
                                                        size_t size, int64_t 
cur_time,
-                                                       
std::lock_guard<std::mutex>& cache_lock);
+                                                       
std::lock_guard<std::mutex>& cache_lock,
+                                                       bool sync_removal);
 
     bool try_reserve_from_other_queue_by_size(FileCacheType cur_type,
                                               std::vector<FileCacheType> 
other_cache_types,
-                                              size_t size, 
std::lock_guard<std::mutex>& cache_lock);
+                                              size_t size, 
std::lock_guard<std::mutex>& cache_lock,
+                                              bool sync_removal);
 
     bool is_overflow(size_t removed_size, size_t need_size, size_t 
cur_cache_size) const;
 
@@ -476,9 +500,11 @@ private:
     std::thread _cache_background_monitor_thread;
     std::thread _cache_background_ttl_gc_thread;
     std::thread _cache_background_gc_thread;
+    std::thread _cache_background_evict_in_advance_thread;
     std::atomic_bool _async_open_done {false};
     // disk space or inode is less than the specified value
     bool _disk_resource_limit_mode {false};
+    bool _need_evict_cache_in_advance {false};
     bool _is_initialized {false};
 
     // strategy
@@ -536,12 +562,15 @@ private:
     std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
     std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
     std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
+    std::shared_ptr<bvar::Status<size_t>> _need_evict_cache_in_advance_metrics;
 
     std::shared_ptr<bvar::LatencyRecorder> _cache_lock_wait_time_us;
     std::shared_ptr<bvar::LatencyRecorder> _get_or_set_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> 
_storage_retry_sync_remove_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
+    std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
+    std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
 };
 
 } // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 117f01d63e3..e42b6516d58 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -139,6 +139,10 @@ class BlockFileCacheTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
         config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+        config::enable_evict_file_cache_in_advance = false; // disable evict in
+                                                            // advance for most
+                                                            // cases for simple
+                                                            // verification
         bool exists {false};
         ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, 
&exists).ok());
         if (!exists) {
@@ -4402,7 +4406,7 @@ TEST_F(BlockFileCacheTest, 
test_check_disk_reource_limit_1) {
         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
     std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 90);
+    EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 88);
     EXPECT_EQ(config::file_cache_exit_disk_resource_limit_mode_percent, 80);
     config::file_cache_enter_disk_resource_limit_mode_percent = 99;
     if (fs::exists(cache_base_path)) {
@@ -6742,4 +6746,241 @@ TEST_F(BlockFileCacheTest, 
evict_privilege_order_for_ttl) {
     }
 }
 
+TEST_F(BlockFileCacheTest, evict_in_advance) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    auto sp = SyncPoint::get_instance();
+    SyncPoint::CallbackGuard guard1;
+    sp->set_call_back(
+            "BlockFileCache::set_sleep_time",
+            [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }, 
&guard1);
+    sp->enable_processing();
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 3000000;
+    settings.query_queue_elements = 30000;
+    settings.index_queue_size = 1000000;
+    settings.index_queue_elements = 10000;
+    settings.disposable_queue_size = 1000000;
+    settings.disposable_queue_elements = 10000;
+    settings.capacity = 10000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    size_t limit = 1000000;
+    size_t cache_max = 10000000;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.query_id = query_id;
+    // int64_t cur_time = UnixSeconds();
+    // context.expiration_time = cur_time + 120;
+    auto key1 = io::BlockFileCache::hash("key1");
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+    int64_t offset = 0;
+    // fill the cache to its limit
+    for (; offset < limit; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    // grab more exceed the limit to max cache capacity
+    for (; offset < cache_max; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(3, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(4, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+    
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
 0);
+
+    // grab more exceed the cache capacity
+    size_t exceed = 2000000;
+    for (; offset < (cache_max + exceed); offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(5, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(6, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+
+    config::file_cache_evict_in_advance_batch_bytes = 200000;     // evict 2 
100000 blocks
+    config::enable_evict_file_cache_in_advance = true;            // enable 
evict in advance
+    std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // wait for 
clear
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max - 
200000);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST_F(BlockFileCacheTest, test_check_need_evict_cache_in_advance) {
+    std::string cache_base_path = "./ut_file_cache_dir";
+    fs::create_directories(cache_base_path);
+
+    io::FileCacheSettings settings;
+    settings.capacity = 100_mb;
+    settings.storage = "disk";
+    settings.query_queue_size = 50_mb;
+    settings.index_queue_size = 20_mb;
+    settings.disposable_queue_size = 20_mb;
+    settings.ttl_queue_size = 10_mb;
+
+    // this one for memory storage
+    {
+        settings.storage = "memory";
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+    }
+
+    // the rest for disk
+    settings.storage = "disk";
+
+    // bad disk path
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+
+        cache._cache_base_path = "/non/existent/path/OOXXOO";
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+    }
+
+    // conditions for enter need evict cache in advance
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+
+        // condition1 space usage rate exceed threshold
+        config::file_cache_enter_need_evict_cache_in_advance_percent = 70;
+        config::file_cache_exit_need_evict_cache_in_advance_percent = 65;
+
+        SyncPoint::get_instance()->set_call_back(
+                "BlockFileCache::disk_used_percentage:1", 
[](std::vector<std::any>&& values) {
+                    auto* percent = try_any_cast<std::pair<int, 
int>*>(values.back());
+                    percent->first = 75; // set high
+                    percent->second = 60;
+                });
+
+        SyncPoint::get_instance()->enable_processing();
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_TRUE(cache._need_evict_cache_in_advance);
+        SyncPoint::get_instance()->disable_processing();
+        SyncPoint::get_instance()->clear_all_call_backs();
+
+        // condition2 inode usage rate exceed threshold
+        cache._need_evict_cache_in_advance = false;
+
+        SyncPoint::get_instance()->set_call_back(
+                "BlockFileCache::disk_used_percentage:1", 
[](std::vector<std::any>&& values) {
+                    auto* percent = try_any_cast<std::pair<int, 
int>*>(values.back());
+                    percent->first = 60;
+                    percent->second = 75; // set high
+                });
+
+        SyncPoint::get_instance()->enable_processing();
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_TRUE(cache._need_evict_cache_in_advance);
+        SyncPoint::get_instance()->disable_processing();
+        SyncPoint::get_instance()->clear_all_call_backs();
+
+        // condition3 cache size usage rate exceed threshold
+        cache._need_evict_cache_in_advance = false;
+        cache._cur_cache_size = 80_mb; // set high
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_TRUE(cache._need_evict_cache_in_advance);
+    }
+
+    // conditions for exit need evict cache in advance
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+        cache._need_evict_cache_in_advance = true;
+        cache._cur_cache_size = 50_mb; // set low
+
+        SyncPoint::get_instance()->set_call_back(
+                "BlockFileCache::disk_used_percentage:1", 
[](std::vector<std::any>&& values) {
+                    auto* percent = try_any_cast<std::pair<int, 
int>*>(values.back());
+                    percent->first = 50;  // set low
+                    percent->second = 50; // set low
+                });
+
+        SyncPoint::get_instance()->enable_processing();
+        cache.check_need_evict_cache_in_advance();
+        ASSERT_FALSE(cache._need_evict_cache_in_advance);
+        SyncPoint::get_instance()->disable_processing();
+        SyncPoint::get_instance()->clear_all_call_backs();
+    }
+
+    // config parameter validation
+    {
+        io::BlockFileCache cache(cache_base_path, settings);
+
+        // set wrong config value
+        config::file_cache_enter_need_evict_cache_in_advance_percent = 70;
+        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
+
+        cache.check_need_evict_cache_in_advance();
+
+        // reset to default value
+        
ASSERT_EQ(config::file_cache_enter_need_evict_cache_in_advance_percent, 78);
+        ASSERT_EQ(config::file_cache_exit_need_evict_cache_in_advance_percent, 
75);
+    }
+
+    fs::remove_all(cache_base_path);
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to