This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c540f5e0e9 [fix](cloud) fix file cache potential leakage (#46561)
4c540f5e0e9 is described below

commit 4c540f5e0e9c6cf78c3b6ada59b95e7b8563a928
Author: zhengyu <zhangzhen...@selectdb.com>
AuthorDate: Thu Jan 23 19:18:53 2025 +0800

    [fix](cloud) fix file cache potential leakage (#46561)
    
    将原来的 同步/异步删除 cache meta + 同步/异步删除 cache data file 多维度的删除策略降维简化: 所有 cache
    meta 都是同步删除(除正在使用,此case处理方式见下文),data file在 critical 场景同步删除、gc 场景下异步删除
    
    异步清理调度的优化:
    - 之前的调度逻辑会提前中断,导致清理效率低下
    - 甚至调度会有概率进入某些状态导致清理无法继续进行
    - 优化 CPU 使用,避免额外无效队列遍历
    - 增加窗口算法对异步删除 data file 进行 qps 限制
    
    优化标记删除:
    - 之前的标记删除机制对 TTL data file 有两个方面的空间泄漏问题
    - 扩展应用场景:从原来只能用于 clear_cache、reset_capacity缩容,扩展任意异步删除的场景
    - 将新的标记删除机制 除应用在 正在引用的数据之外,还解决了 DOWNLOADING 状态数据的删除泄漏问题
    
    fix 删除正在引用的数据过程的多处泄漏:
    - 之前没有机制对于正在引用的数据进行标记删除,只能放任赦免
    - 现在配合优化后的标记删除机制,使用析构函数在释放引用后自动删除
    
    发现并修复队列操作中存在的内存写飞隐患
    - reset_capacity 在迭代内部 erase容器条目,可能会导致指针悬空
    
    其它小优化:
    - 使用 concurrentqueue 代替之前的静态无锁队列:保持性能的同时减少队列满、进入同步删文件带来的 IO burst 及伴随的
    cache lock 开销
    - 清理弃用的 file_cache_ttl_valid_check_interval_second 配置:现在 ttl 支持 LRU
    了,不用额外定时清理
    - 多线程拆分:避免 metrics、resource limit、data file 清理、ttl 超时清理 相互影响
    
    Signed-off-by: zhengyu <zhangzhen...@selectdb.com>
---
 be/src/cloud/cloud_storage_engine.cpp      |  31 ---
 be/src/cloud/cloud_storage_engine.h        |   1 -
 be/src/cloud/cloud_tablet_mgr.cpp          |   4 +-
 be/src/common/config.cpp                   |   3 +-
 be/src/common/config.h                     |   4 +-
 be/src/io/cache/block_file_cache.cpp       | 317 +++++++++++++----------------
 be/src/io/cache/block_file_cache.h         |  40 ++--
 be/src/io/cache/file_block.cpp             |   8 +-
 be/src/io/cache/file_block.h               |   5 +
 be/src/olap/compaction.cpp                 |   2 +-
 be/test/io/cache/block_file_cache_test.cpp | 108 ++--------
 11 files changed, 196 insertions(+), 327 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 650909a2915..766f83563f7 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -300,42 +300,11 @@ Status 
CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sp
             "StorageEngine", "lease_compaction_thread",
             [this]() { this->_lease_compaction_thread_callback(); }, 
&_bg_threads.emplace_back()));
 
-    if (config::file_cache_ttl_valid_check_interval_second != 0) {
-        RETURN_IF_ERROR(Thread::create(
-                "StorageEngine", "check_file_cache_ttl_block_valid_thread",
-                [this]() { this->_check_file_cache_ttl_block_valid(); },
-                &_bg_threads.emplace_back()));
-        LOG(INFO) << "check file cache ttl block valid thread started";
-    }
-
     LOG(INFO) << "lease compaction thread started";
 
     return Status::OK();
 }
 
-void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
-    int64_t interval_seconds = 
config::file_cache_ttl_valid_check_interval_second / 2;
-    auto check_ttl = [](const std::weak_ptr<CloudTablet>& tablet_wk) {
-        auto tablet = tablet_wk.lock();
-        if (!tablet) return;
-        if (tablet->tablet_meta()->ttl_seconds() == 0) return;
-        auto rowsets = tablet->get_snapshot_rowset();
-        for (const auto& rowset : rowsets) {
-            int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
-            if (rowset->newest_write_timestamp() + ttl_seconds <= 
UnixSeconds()) continue;
-            for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); 
seg_id++) {
-                auto hash = 
Segment::file_cache_key(rowset->rowset_id().to_string(), seg_id);
-                auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(hash);
-                file_cache->update_ttl_atime(hash);
-            }
-        }
-    };
-    while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval_seconds)))
 {
-        auto weak_tablets = tablet_mgr().get_weak_tablets();
-        std::for_each(weak_tablets.begin(), weak_tablets.end(), check_ttl);
-    }
-}
-
 void CloudStorageEngine::sync_storage_vault() {
     cloud::StorageVaultInfos vault_infos;
     bool enable_storage_vault = false;
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 2cd47c52dbe..34bde2e75f7 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -72,7 +72,6 @@ public:
     ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
         return *_calc_tablet_delete_bitmap_task_thread_pool;
     }
-    void _check_file_cache_ttl_block_valid();
 
     std::optional<StorageResource> get_storage_resource(const std::string& 
vault_id) {
         VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index f60d0eeb5ba..d597ccc42a1 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -238,7 +238,9 @@ void CloudTabletMgr::vacuum_stale_rowsets(const 
CountDownLatch& stop_latch) {
 
         num_vacuumed += t->delete_expired_stale_rowsets();
     }
-    LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed);
+    LOG_INFO("finish vacuum stale rowsets")
+            .tag("num_vacuumed", num_vacuumed)
+            .tag("num_tablets", tablets_to_vacuum.size());
 }
 
 std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2ffd1085345..39549d18986 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1050,7 +1050,6 @@ 
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
 DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
 DEFINE_mBool(enable_read_cache_file_directly, "false");
 DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
-DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for 
not checking
 // If true, evict the ttl cache using LRU when full.
 // Otherwise, only expiration can evict ttl and new data won't add to cache 
when full.
 DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
@@ -1058,8 +1057,8 @@ DEFINE_mBool(enbale_dump_error_file, "false");
 // limit the max size of error log on disk
 DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
 DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
-DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
 DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
+DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 
 DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
 DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6c84d89faba..7e177bb7236 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1090,7 +1090,6 @@ 
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
 DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
 DECLARE_mBool(enable_read_cache_file_directly);
 DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
-DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
 // If true, evict the ttl cache using LRU when full.
 // Otherwise, only expiration can evict ttl and new data won't add to cache 
when full.
 DECLARE_Bool(enable_ttl_cache_evict_using_lru);
@@ -1098,7 +1097,6 @@ DECLARE_mBool(enbale_dump_error_file);
 // limit the max size of error log on disk
 DECLARE_mInt64(file_cache_error_log_limit_bytes);
 DECLARE_mInt64(cache_lock_long_tail_threshold);
-DECLARE_Int64(file_cache_recycle_keys_size);
 // Base compaction may retrieve and produce some less frequently accessed data,
 // potentially affecting the file cache hit rate.
 // This configuration determines whether to retain the output within the file 
cache.
@@ -1106,7 +1104,7 @@ DECLARE_Int64(file_cache_recycle_keys_size);
 // If your file cache is ample enough to accommodate all the data in your 
database,
 // enable this option; otherwise, it is recommended to leave it disabled.
 DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
-
+DECLARE_mInt64(file_cache_remove_block_qps_limit);
 // inverted index searcher cache
 // cache entry stay time after lookup
 DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index b5f48d09648..59bb1becb5d 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -202,6 +202,11 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
     _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
             _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
 
+    _storage_sync_remove_latency = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_storage_sync_remove_latency_ns");
+    _storage_async_remove_latency = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_storage_async_remove_latency_ns");
+
     _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                  cache_settings.disposable_queue_elements, 60 
* 60);
     _index_queue = LRUQueue(cache_settings.index_queue_size, 
cache_settings.index_queue_elements,
@@ -210,9 +215,6 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                              24 * 60 * 60);
     _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, 
cache_settings.ttl_queue_elements,
                           std::numeric_limits<int>::max());
-
-    _recycle_keys = 
std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>(
-            config::file_cache_recycle_keys_size);
     if (cache_settings.storage == "memory") {
         _storage = std::make_unique<MemFileCacheStorage>();
         _cache_base_path = "memory";
@@ -328,7 +330,9 @@ Status 
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
     DCHECK(!_is_initialized);
     _is_initialized = true;
     RETURN_IF_ERROR(_storage->init(this));
-    _cache_background_thread = 
std::thread(&BlockFileCache::run_background_operation, this);
+    _cache_background_monitor_thread = 
std::thread(&BlockFileCache::run_background_monitor, this);
+    _cache_background_ttl_gc_thread = 
std::thread(&BlockFileCache::run_background_ttl_gc, this);
+    _cache_background_gc_thread = 
std::thread(&BlockFileCache::run_background_gc, this);
 
     return Status::OK();
 }
@@ -346,7 +350,6 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, 
FileBlocks* result, boo
     }
 
     cell.update_atime();
-    cell.is_deleted = false;
 }
 
 template <class T>
@@ -534,130 +537,48 @@ std::string BlockFileCache::clear_file_cache_async() {
     LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
     int64_t num_cells_all = 0;
     int64_t num_cells_to_delete = 0;
+    int64_t num_cells_wait_recycle = 0;
     int64_t num_files_all = 0;
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
     {
         SCOPED_CACHE_LOCK(_mutex);
-        if (!_async_clear_file_cache) {
-            for (auto& [_, offset_to_cell] : _files) {
-                ++num_files_all;
-                for (auto& [_, cell] : offset_to_cell) {
-                    ++num_cells_all;
-                    if (cell.releasable()) {
-                        cell.is_deleted = true;
-                        ++num_cells_to_delete;
-                    }
-                }
+
+        std::vector<FileBlockCell*> deleting_cells;
+        for (auto& [_, offset_to_cell] : _files) {
+            ++num_files_all;
+            for (auto& [_, cell] : offset_to_cell) {
+                ++num_cells_all;
+                deleting_cells.push_back(&cell);
+            }
+        }
+
+        // we cannot delete the element in the loop above, because it will 
break the iterator
+        for (auto& cell : deleting_cells) {
+            if (!cell->releasable()) {
+                LOG(INFO) << "cell is not releasable, hash="
+                          << " offset=" << cell->file_block->offset();
+                cell->file_block->set_deleting();
+                ++num_cells_wait_recycle;
+                continue;
+            }
+            FileBlockSPtr file_block = cell->file_block;
+            if (file_block) {
+                std::lock_guard block_lock(file_block->_mutex);
+                remove(file_block, cache_lock, block_lock, false);
+                ++num_cells_to_delete;
             }
-            _async_clear_file_cache = true;
         }
     }
     std::stringstream ss;
     ss << "finish clear_file_cache_async, path=" << _cache_base_path
        << " num_files_all=" << num_files_all << " num_cells_all=" << 
num_cells_all
-       << " num_cells_to_delete=" << num_cells_to_delete;
+       << " num_cells_to_delete=" << num_cells_to_delete
+       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
     auto msg = ss.str();
     LOG(INFO) << msg;
     return msg;
 }
 
-void BlockFileCache::recycle_deleted_blocks() {
-    using namespace std::chrono;
-    static int remove_batch = 100;
-    TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", 
&remove_batch);
-    TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
-    std::unique_lock cache_lock(_mutex);
-    auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
-        std::lock_guard segment_lock(cell->file_block->_mutex);
-        remove(cell->file_block, cache_lock, segment_lock);
-    };
-    int i = 0;
-    std::condition_variable cond;
-    auto start_time = steady_clock::time_point();
-    if (_async_clear_file_cache) {
-        LOG_INFO("Start clear file cache async").tag("path", _cache_base_path);
-        auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
-            std::lock_guard segment_lock(cell->file_block->_mutex);
-            remove(cell->file_block, cache_lock, segment_lock);
-        };
-        static int remove_batch = 100;
-        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", 
&remove_batch);
-        int i = 0;
-        std::condition_variable cond;
-        auto iter_queue = [&](LRUQueue& queue) {
-            bool end = false;
-            while (queue.get_capacity(cache_lock) != 0 && !end) {
-                std::vector<FileBlockCell*> cells;
-                for (const auto& [entry_key, entry_offset, _] : queue) {
-                    if (i == remove_batch) {
-                        i = 0;
-                        break;
-                    }
-                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
-                    if (!cell) continue;
-                    if (!cell->is_deleted) {
-                        end = true;
-                        break;
-                    } else if (cell->releasable()) {
-                        i++;
-                        cells.push_back(cell);
-                    }
-                }
-                std::ranges::for_each(cells, remove_file_block);
-                // just for sleep
-                cond.wait_for(cache_lock, std::chrono::microseconds(100));
-            }
-        };
-        iter_queue(get_queue(FileCacheType::DISPOSABLE));
-        iter_queue(get_queue(FileCacheType::NORMAL));
-        iter_queue(get_queue(FileCacheType::INDEX));
-    }
-    if (_async_clear_file_cache || 
config::file_cache_ttl_valid_check_interval_second != 0) {
-        std::vector<UInt128Wrapper> ttl_keys;
-        ttl_keys.reserve(_key_to_time.size());
-        for (auto& [key, _] : _key_to_time) {
-            ttl_keys.push_back(key);
-        }
-        for (UInt128Wrapper& hash : ttl_keys) {
-            if (i >= remove_batch) {
-                // just for sleep
-                cond.wait_for(cache_lock, std::chrono::microseconds(100));
-                i = 0;
-            }
-            if (auto iter = _files.find(hash); iter != _files.end()) {
-                std::vector<FileBlockCell*> cells;
-                cells.reserve(iter->second.size());
-                for (auto& [_, cell] : iter->second) {
-                    cell.is_deleted =
-                            cell.is_deleted
-                                    ? true
-                                    : 
(config::file_cache_ttl_valid_check_interval_second == 0
-                                               ? false
-                                               : 
std::chrono::duration_cast<std::chrono::seconds>(
-                                                         
std::chrono::steady_clock::now()
-                                                                 
.time_since_epoch())
-                                                                         
.count() -
-                                                                 cell.atime >
-                                                         
config::file_cache_ttl_valid_check_interval_second);
-                    if (!cell.is_deleted) {
-                        continue;
-                    } else if (cell.releasable()) {
-                        cells.emplace_back(&cell);
-                        i++;
-                    }
-                }
-                std::ranges::for_each(cells, remove_file_block);
-            }
-        }
-        if (_async_clear_file_cache) {
-            _async_clear_file_cache = false;
-            auto use_time = 
duration_cast<milliseconds>(steady_clock::time_point() - start_time);
-            LOG_INFO("End clear file cache async")
-                    .tag("path", _cache_base_path)
-                    .tag("use_time", static_cast<int64_t>(use_time.count()));
-        }
-    }
-}
-
 FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
                                                   const CacheContext& context, 
size_t offset,
                                                   size_t size, 
FileBlock::State state,
@@ -866,6 +787,8 @@ size_t BlockFileCache::try_release() {
         for (auto& [offset, cell] : blocks) {
             if (cell.releasable()) {
                 trash.emplace_back(&cell);
+            } else {
+                cell.file_block->set_deleting();
             }
         }
     }
@@ -914,24 +837,12 @@ const BlockFileCache::LRUQueue& 
BlockFileCache::get_queue(FileCacheType type) co
 }
 
 void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
-                                        std::lock_guard<std::mutex>& 
cache_lock) {
+                                        std::lock_guard<std::mutex>& 
cache_lock, bool sync) {
     auto remove_file_block_if = [&](FileBlockCell* cell) {
         FileBlockSPtr file_block = cell->file_block;
         if (file_block) {
             std::lock_guard block_lock(file_block->_mutex);
-            remove(file_block, cache_lock, block_lock);
-        }
-    };
-    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
-}
-
-void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>& 
to_evict,
-                                              std::lock_guard<std::mutex>& 
cache_lock) {
-    auto remove_file_block_if = [&](FileBlockCell* cell) {
-        FileBlockSPtr file_block = cell->file_block;
-        if (file_block) {
-            std::lock_guard block_lock(file_block->_mutex);
-            remove(file_block, cache_lock, block_lock, /*sync*/ false);
+            remove(file_block, cache_lock, block_lock, sync);
         }
     };
     std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
@@ -1100,8 +1011,8 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& 
hash, const CacheContext&
     return true;
 }
 
-bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, 
bool remove_directly,
-                                               std::lock_guard<std::mutex>& 
cache_lock) {
+bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, 
bool remove_directly,
+                                               std::lock_guard<std::mutex>& 
cache_lock, bool sync) {
     auto& ttl_queue = get_queue(FileCacheType::TTL);
     if (auto iter = _key_to_time.find(file_key);
         _key_to_time.find(file_key) != _key_to_time.end()) {
@@ -1136,13 +1047,13 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const 
UInt128Wrapper& file_key, b
                 if (cell.releasable()) {
                     to_remove.push_back(&cell);
                 } else {
-                    cell.is_deleted = true;
+                    cell.file_block->set_deleting();
                 }
             }
             std::for_each(to_remove.begin(), to_remove.end(), 
[&](FileBlockCell* cell) {
                 FileBlockSPtr file_block = cell->file_block;
                 std::lock_guard block_lock(file_block->_mutex);
-                remove(file_block, cache_lock, block_lock);
+                remove(file_block, cache_lock, block_lock, sync);
             });
         }
         // remove from _time_to_key
@@ -1161,9 +1072,11 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const 
UInt128Wrapper& file_key, b
     return false;
 }
 
+// remove specific cache synchronously, for critical operations
+// if in use, cache meta will be deleted after use and the block file is then 
deleted asynchronously
 void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
     SCOPED_CACHE_LOCK(_mutex);
-    bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
+    bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, 
true);
     if (!is_ttl_file) {
         auto iter = _files.find(file_key);
         std::vector<FileBlockCell*> to_remove;
@@ -1171,16 +1084,21 @@ void BlockFileCache::remove_if_cached(const 
UInt128Wrapper& file_key) {
             for (auto& [_, cell] : iter->second) {
                 if (cell.releasable()) {
                     to_remove.push_back(&cell);
+                } else {
+                    cell.file_block->set_deleting();
                 }
             }
         }
-        remove_file_blocks(to_remove, cache_lock);
+        remove_file_blocks(to_remove, cache_lock, true);
     }
 }
 
+// the async version of remove_if_cached, for background operations
+// cache meta is deleted synchronously if not in use, and the block file is 
deleted asynchronously
+// if in use, cache meta will be deleted after use and the block file is then 
deleted asynchronously
 void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
     SCOPED_CACHE_LOCK(_mutex);
-    bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
+    bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, 
/*sync*/ false);
     if (!is_ttl_file) {
         auto iter = _files.find(file_key);
         std::vector<FileBlockCell*> to_remove;
@@ -1188,10 +1106,12 @@ void BlockFileCache::remove_if_cached_async(const 
UInt128Wrapper& file_key) {
             for (auto& [_, cell] : iter->second) {
                 if (cell.releasable()) {
                     to_remove.push_back(&cell);
+                } else {
+                    cell.file_block->set_deleting();
                 }
             }
         }
-        remove_file_blocks_async(to_remove, cache_lock);
+        remove_file_blocks(to_remove, cache_lock, false);
     }
 }
 
@@ -1281,7 +1201,7 @@ bool 
BlockFileCache::try_reserve_from_other_queue_by_time_interval(
         }
         *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << 
remove_size_per_type;
     }
-    remove_file_blocks(to_evict, cache_lock);
+    remove_file_blocks(to_evict, cache_lock, true);
 
     return !is_overflow(removed_size, size, cur_cache_size);
 }
@@ -1319,7 +1239,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
                               cur_removed_size);
         *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << 
cur_removed_size;
     }
-    remove_file_blocks(to_evict, cache_lock);
+    remove_file_blocks(to_evict, cache_lock, true);
     return !is_overflow(removed_size, size, cur_cache_size);
 }
 
@@ -1362,7 +1282,7 @@ bool BlockFileCache::try_reserve_for_lru(const 
UInt128Wrapper& hash,
         size_t cur_removed_size = 0;
         find_evict_candidates(queue, size, cur_cache_size, removed_size, 
to_evict, cache_lock,
                               cur_removed_size);
-        remove_file_blocks(to_evict, cache_lock);
+        remove_file_blocks(to_evict, cache_lock, true);
         *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << 
cur_removed_size;
 
         if (is_overflow(removed_size, size, cur_cache_size)) {
@@ -1385,6 +1305,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
     auto expiration_time = file_block->expiration_time();
     auto* cell = get_cell(hash, offset, cache_lock);
     DCHECK(cell);
+    DCHECK(cell->queue_iterator);
     if (cell->queue_iterator) {
         auto& queue = get_queue(file_block->cache_type());
         queue.remove(*cell->queue_iterator, cache_lock);
@@ -1392,14 +1313,17 @@ void BlockFileCache::remove(FileBlockSPtr file_block, 
T& cache_lock, U& block_lo
     *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
             << file_block->range().size();
     *_total_evict_size_metrics << file_block->range().size();
-    if (cell->file_block->state_unlock(block_lock) == 
FileBlock::State::DOWNLOADED) {
+    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
         FileCacheKey key;
         key.hash = hash;
         key.offset = offset;
         key.meta.type = type;
         key.meta.expiration_time = expiration_time;
         if (sync) {
+            int64_t duration_ns = 0;
+            SCOPED_RAW_TIMER(&duration_ns);
             Status st = _storage->remove(key);
+            *_storage_sync_remove_latency << duration_ns;
             if (!st.ok()) {
                 LOG_WARNING("").error(st);
             }
@@ -1407,16 +1331,21 @@ void BlockFileCache::remove(FileBlockSPtr file_block, 
T& cache_lock, U& block_lo
             // the file will be deleted in the bottom half
             // so there will be a window that the file is not in the cache but 
still in the storage
             // but it's ok, because the rowset is stale already
-            // in case something unexpected happen, set the _recycle_keys 
queue to zero to fallback
-            bool ret = _recycle_keys->push(key);
+            bool ret = _recycle_keys.enqueue(key);
             if (!ret) {
                 LOG_WARNING("Failed to push recycle key to queue, do it 
synchronously");
+                int64_t duration_ns = 0;
+                SCOPED_RAW_TIMER(&duration_ns);
                 Status st = _storage->remove(key);
+                *_storage_sync_remove_latency << duration_ns;
                 if (!st.ok()) {
                     LOG_WARNING("").error(st);
                 }
             }
         }
+    } else if (file_block->state_unlock(block_lock) == 
FileBlock::State::DOWNLOADING) {
+        file_block->set_deleting();
+        return;
     }
     _cur_cache_size -= file_block->range().size();
     if (FileCacheType::TTL == type) {
@@ -1430,16 +1359,6 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
     *_num_removed_blocks << 1;
 }
 
-void BlockFileCache::recycle_stale_rowset_async_bottom_half() {
-    FileCacheKey key;
-    while (_recycle_keys->pop(key)) {
-        Status st = _storage->remove(key);
-        if (!st.ok()) {
-            LOG_WARNING("").error(st);
-        }
-    }
-}
-
 size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
     SCOPED_CACHE_LOCK(_mutex);
     return get_used_cache_size_unlocked(cache_type, cache_lock);
@@ -1632,14 +1551,25 @@ std::string BlockFileCache::reset_capacity(size_t 
new_capacity) {
             int64_t need_remove_size = _cur_cache_size - new_capacity;
             auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
                 int64_t queue_released = 0;
+                std::vector<FileBlockCell*> to_evict;
                 for (const auto& [entry_key, entry_offset, entry_size] : 
queue) {
-                    if (need_remove_size <= 0) return queue_released;
-                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
-                    if (!cell->releasable()) continue;
-                    cell->is_deleted = true;
+                    if (need_remove_size <= 0) {
+                        break;
+                    }
                     need_remove_size -= entry_size;
                     space_released += entry_size;
                     queue_released += entry_size;
+                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
+                    if (!cell->releasable()) {
+                        cell->file_block->set_deleting();
+                        continue;
+                    }
+                    to_evict.push_back(cell);
+                }
+                for (auto& cell : to_evict) {
+                    FileBlockSPtr file_block = cell->file_block;
+                    std::lock_guard block_lock(file_block->_mutex);
+                    remove(file_block, cache_lock, block_lock);
                 }
                 return queue_released;
             };
@@ -1649,22 +1579,11 @@ std::string BlockFileCache::reset_capacity(size_t 
new_capacity) {
             ss << " normal_queue released " << queue_released;
             queue_released = remove_blocks(_index_queue);
             ss << " index_queue released " << queue_released;
-            if (need_remove_size >= 0) {
-                queue_released = 0;
-                for (auto& [_, key] : _time_to_key) {
-                    for (auto& [_, cell] : _files[key]) {
-                        if (need_remove_size <= 0) break;
-                        cell.is_deleted = true;
-                        need_remove_size -= cell.file_block->range().size();
-                        space_released += cell.file_block->range().size();
-                        queue_released += cell.file_block->range().size();
-                    }
-                }
-                ss << " ttl_queue released " << queue_released;
-            }
+            queue_released = remove_blocks(_ttl_queue);
+            ss << " ttl_queue released " << queue_released;
+
             _disk_resource_limit_mode = true;
             _disk_limit_mode_metrics->set_value(1);
-            _async_clear_file_cache = true;
             ss << " total_space_released=" << space_released;
         }
         old_capacity = _capacity;
@@ -1729,7 +1648,7 @@ void BlockFileCache::check_disk_resource_limit() {
     }
 }
 
-void BlockFileCache::run_background_operation() {
+void BlockFileCache::run_background_monitor() {
     int64_t interval_time_seconds = 20;
     while (!_close) {
         TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", 
&interval_time_seconds);
@@ -1777,10 +1696,20 @@ void BlockFileCache::run_background_operation() {
                                          _num_read_blocks_1h->get_value());
             }
         }
+    }
+}
 
-        recycle_stale_rowset_async_bottom_half();
-        recycle_deleted_blocks();
-        // gc
+void BlockFileCache::run_background_ttl_gc() { // TODO(zhengyu): fix!
+    int64_t interval_time_seconds = 20;
+    while (!_close) {
+        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", 
&interval_time_seconds);
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::seconds(interval_time_seconds));
+            if (_close) {
+                break;
+            }
+        }
         {
             int64_t cur_time = UnixSeconds();
             SCOPED_CACHE_LOCK(_mutex);
@@ -1789,9 +1718,41 @@ void BlockFileCache::run_background_operation() {
                 if (cur_time < begin->first) {
                     break;
                 }
-                remove_if_ttl_file_unlock(begin->second, false, cache_lock);
+                remove_if_ttl_file_blocks(begin->second, false, cache_lock, 
false);
+            }
+        }
+    }
+}
+
+void BlockFileCache::run_background_gc() {
+    FileCacheKey key;
+    static const size_t interval_ms = 100;
+    const size_t batch_limit = config::file_cache_remove_block_qps_limit * 
interval_ms / 1000;
+    size_t batch_count = 0;
+    while (!_close) {
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+        while (_recycle_keys.try_dequeue(key)) {
+            if (batch_count >= batch_limit) {
+                break;
+            }
+
+            int64_t duration_ns = 0;
+            SCOPED_RAW_TIMER(&duration_ns);
+            Status st = _storage->remove(key);
+            *_storage_async_remove_latency << duration_ns;
+
+            if (!st.ok()) {
+                LOG_WARNING("").error(st);
             }
+            batch_count++;
         }
+        batch_count = 0;
     }
 }
 
@@ -1800,7 +1761,7 @@ void BlockFileCache::modify_expiration_time(const 
UInt128Wrapper& hash,
     SCOPED_CACHE_LOCK(_mutex);
     // 1. If new_expiration_time is equal to zero
     if (new_expiration_time == 0) {
-        remove_if_ttl_file_unlock(hash, false, cache_lock);
+        remove_if_ttl_file_blocks(hash, false, cache_lock, false);
         return;
     }
     // 2. If the hash in ttl cache, modify its expiration time.
@@ -1917,7 +1878,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t 
size,
     if (index_queue_size != 0) {
         collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
     }
-    remove_file_blocks(to_evict, cache_lock);
+    remove_file_blocks(to_evict, cache_lock, true);
 
     return !_disk_resource_limit_mode || removed_size >= size;
 }
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index f23d5a3799e..f93a72cbc62 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <bvar/bvar.h>
+#include <concurrentqueue.h>
 
 #include <boost/lockfree/spsc_queue.hpp>
 #include <memory>
@@ -31,7 +32,7 @@
 #include "util/threadpool.h"
 
 namespace doris::io {
-
+using RecycleFileCacheKeys = moodycamel::ConcurrentQueue<FileCacheKey>;
 // Note: the cache_lock is scoped, so do not add do...while(0) here.
 #ifdef ENABLE_CACHE_LOCK_DEBUG
 #define SCOPED_CACHE_LOCK(MUTEX)                                               
                   \
@@ -95,8 +96,14 @@ public:
             _close = true;
         }
         _close_cv.notify_all();
-        if (_cache_background_thread.joinable()) {
-            _cache_background_thread.join();
+        if (_cache_background_monitor_thread.joinable()) {
+            _cache_background_monitor_thread.join();
+        }
+        if (_cache_background_ttl_gc_thread.joinable()) {
+            _cache_background_ttl_gc_thread.join();
+        }
+        if (_cache_background_gc_thread.joinable()) {
+            _cache_background_gc_thread.join();
         }
     }
 
@@ -336,7 +343,6 @@ private:
         std::optional<LRUQueue::Iterator> queue_iterator;
 
         mutable int64_t atime {0};
-        mutable bool is_deleted {false};
         void update_atime() const {
             atime = std::chrono::duration_cast<std::chrono::seconds>(
                             
std::chrono::steady_clock::now().time_since_epoch())
@@ -425,12 +431,12 @@ private:
 
     bool need_to_move(FileCacheType cell_type, FileCacheType query_type) const;
 
-    bool remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, bool 
remove_directly,
-                                   std::lock_guard<std::mutex>&);
-
-    void run_background_operation();
+    bool remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, bool 
remove_directly,
+                                   std::lock_guard<std::mutex>&, bool sync);
 
-    void recycle_deleted_blocks();
+    void run_background_monitor();
+    void run_background_ttl_gc();
+    void run_background_gc();
 
     bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
                                                        
std::vector<FileCacheType> other_cache_types,
@@ -443,9 +449,7 @@ private:
 
     bool is_overflow(size_t removed_size, size_t need_size, size_t 
cur_cache_size) const;
 
-    void remove_file_blocks(std::vector<FileBlockCell*>&, 
std::lock_guard<std::mutex>&);
-
-    void remove_file_blocks_async(std::vector<FileBlockCell*>&, 
std::lock_guard<std::mutex>&);
+    void remove_file_blocks(std::vector<FileBlockCell*>&, 
std::lock_guard<std::mutex>&, bool sync);
 
     void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&,
                                                 std::lock_guard<std::mutex>&);
@@ -454,8 +458,6 @@ private:
                                size_t& removed_size, 
std::vector<FileBlockCell*>& to_evict,
                                std::lock_guard<std::mutex>& cache_lock, 
size_t& cur_removed_size);
 
-    void recycle_stale_rowset_async_bottom_half();
-
     // info
     std::string _cache_base_path;
     size_t _capacity = 0;
@@ -467,9 +469,10 @@ private:
     bool _close {false};
     std::mutex _close_mtx;
     std::condition_variable _close_cv;
-    std::thread _cache_background_thread;
+    std::thread _cache_background_monitor_thread;
+    std::thread _cache_background_ttl_gc_thread;
+    std::thread _cache_background_gc_thread;
     std::atomic_bool _async_open_done {false};
-    bool _async_clear_file_cache {false};
     // disk space or inode is less than the specified value
     bool _disk_resource_limit_mode {false};
     bool _is_initialized {false};
@@ -495,7 +498,7 @@ private:
     LRUQueue _ttl_queue;
 
     // keys for async remove
-    std::shared_ptr<boost::lockfree::spsc_queue<FileCacheKey>> _recycle_keys;
+    RecycleFileCacheKeys _recycle_keys;
 
     // metrics
     std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics;
@@ -529,6 +532,9 @@ private:
     std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
     std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
     std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
+
+    std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency;
+    std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 44cad5520ea..8c911ab8f24 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -287,7 +287,8 @@ FileBlocksHolder::~FileBlocksHolder() {
                 std::lock_guard block_lock(file_block->_mutex);
                 file_block->complete_unlocked(block_lock);
                 if (file_block.use_count() == 2 &&
-                    file_block->state_unlock(block_lock) == 
FileBlock::State::EMPTY) {
+                    (file_block->is_deleting() ||
+                     file_block->state_unlock(block_lock) == 
FileBlock::State::EMPTY)) {
                     should_remove = true;
                 }
             }
@@ -297,8 +298,9 @@ FileBlocksHolder::~FileBlocksHolder() {
                 if (file_block.use_count() == 2) {
                     DCHECK(file_block->state_unlock(block_lock) != 
FileBlock::State::DOWNLOADING);
                     // one in cache, one in here
-                    if (file_block->state_unlock(block_lock) == 
FileBlock::State::EMPTY) {
-                        _mgr->remove(file_block, cache_lock, block_lock);
+                    if (file_block->is_deleting() ||
+                        file_block->state_unlock(block_lock) == 
FileBlock::State::EMPTY) {
+                        _mgr->remove(file_block, cache_lock, block_lock, 
false);
                     }
                 }
             }
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 3a4490d67a3..93c3841693d 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -130,6 +130,10 @@ public:
     FileBlock& operator=(const FileBlock&) = delete;
     FileBlock(const FileBlock&) = delete;
 
+    // block is being using by other thread when deleting, so tag it 
is_deleting and delete later on¬
+    void set_deleting() { _is_deleting = true; }
+    bool is_deleting() const { return _is_deleting; };
+
 private:
     std::string get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) 
const;
 
@@ -155,6 +159,7 @@ private:
     std::condition_variable _cv;
     FileCacheKey _key;
     size_t _downloaded_size {0};
+    bool _is_deleting {false};
 };
 
 extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& 
value);
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8ef296607eb..ade099f0f2d 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1493,7 +1493,7 @@ void CloudCompactionMixin::garbage_collection() {
         for (const auto& [_, file_writer] : 
beta_rowset_writer->get_file_writers()) {
             auto file_key = 
io::BlockFileCache::hash(file_writer->path().filename().native());
             auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(file_key);
-            file_cache->remove_if_cached(file_key);
+            file_cache->remove_if_cached_async(file_key);
         }
     }
 }
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index d8c2dbe384c..117f01d63e3 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -2919,22 +2919,18 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
     auto key = io::BlockFileCache::hash("key1");
     io::BlockFileCache cache(cache_base_path, settings);
     auto sp = SyncPoint::get_instance();
+    FileBlocksHolder* holder;
     SyncPoint::CallbackGuard guard1;
+    // use first block before clean cache
     sp->set_call_back(
-            "BlockFileCache::set_sleep_time",
-            [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; }, 
&guard1);
-    SyncPoint::CallbackGuard guard2;
-    sp->set_call_back(
-            "BlockFileCache::set_remove_batch",
-            [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; }, &guard2);
-    SyncPoint::CallbackGuard guard3;
-    sp->set_call_back(
-            "BlockFileCache::recycle_deleted_blocks",
+            "BlockFileCache::clear_file_cache_async",
             [&](auto&&) {
                 context.cache_type = io::FileCacheType::NORMAL;
-                cache.get_or_set(key, 0, 5, context);
+                FileBlocksHolder h = cache.get_or_set(key, 0, 5, context);
+                holder = new FileBlocksHolder(std::move(h));
             },
-            &guard3);
+            &guard1);
+
     sp->enable_processing();
     ASSERT_TRUE(cache.initialize());
     for (int i = 0; i < 100; i++) {
@@ -2956,13 +2952,12 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
                      io::FileBlock::State::DOWNLOADED);
     }
     cache.clear_file_cache_async();
-    while (cache._async_clear_file_cache)
-        ;
-    EXPECT_EQ(cache._cur_cache_size, 20); // 0-4 is used again, so all the 
cache data in DISPOSABLE
-                                          // remain unremoved
+
+    EXPECT_EQ(cache._cur_cache_size, 5); // only one block is used, other is 
cleared
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
+    delete holder;
 }
 
 TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
@@ -2992,6 +2987,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
     io::BlockFileCache cache(cache_base_path, settings);
     context.cache_type = io::FileCacheType::TTL;
     context.expiration_time = UnixSeconds() + 3600;
+    FileBlocksHolder* holder;
     auto sp = SyncPoint::get_instance();
     SyncPoint::CallbackGuard guard1;
     sp->set_call_back(
@@ -3003,10 +2999,11 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
             [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; }, &guard2);
     SyncPoint::CallbackGuard guard3;
     sp->set_call_back(
-            "BlockFileCache::recycle_deleted_blocks",
+            "BlockFileCache::clear_file_cache_async",
             [&](auto&&) {
                 context.cache_type = io::FileCacheType::NORMAL;
-                cache.get_or_set(key, 0, 5, context);
+                FileBlocksHolder h = cache.get_or_set(key, 0, 5, context);
+                holder = new FileBlocksHolder(std::move(h));
             },
             &guard3);
     sp->enable_processing();
@@ -3040,12 +3037,12 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
                      io::FileBlock::State::DOWNLOADED);
     }
     cache.clear_file_cache_async();
-    while (cache._async_clear_file_cache)
-        ;
+
     EXPECT_EQ(cache._cur_cache_size, 5);
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
+    delete holder;
 }
 
 TEST_F(BlockFileCacheTest, remove_directly) {
@@ -3175,8 +3172,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) {
                      io::FileBlock::State::DOWNLOADED);
     }
     FileCacheFactory::instance()->clear_file_caches(false);
-    while (cache->_async_clear_file_cache)
-        ;
+
     EXPECT_EQ(cache->_cur_cache_size, 0);
 
     for (int64_t offset = 0; offset < 60; offset += 5) {
@@ -4868,73 +4864,6 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) {
     }
 }
 
-TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
-    config::file_cache_ttl_valid_check_interval_second = 4;
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    fs::create_directories(cache_base_path);
-    TUniqueId query_id;
-    query_id.hi = 1;
-    query_id.lo = 1;
-    io::FileCacheSettings settings;
-    settings.query_queue_size = 30;
-    settings.query_queue_elements = 5;
-    settings.index_queue_size = 30;
-    settings.index_queue_elements = 5;
-    settings.disposable_queue_size = 30;
-    settings.disposable_queue_elements = 5;
-    settings.capacity = 90;
-    settings.max_file_block_size = 30;
-    settings.max_query_cache_size = 30;
-    io::CacheContext context;
-    ReadStatistics rstats;
-    context.stats = &rstats;
-    context.query_id = query_id;
-    auto key = io::BlockFileCache::hash("key1");
-    io::BlockFileCache cache(cache_base_path, settings);
-    context.cache_type = io::FileCacheType::TTL;
-    context.expiration_time = UnixSeconds() + 3600;
-    auto sp = SyncPoint::get_instance();
-    Defer defer {[sp] {
-        sp->clear_call_back("BlockFileCache::set_remove_batch");
-        sp->clear_call_back("BlockFileCache::recycle_deleted_blocks");
-        sp->clear_call_back("BlockFileCache::set_sleep_time");
-    }};
-    sp->set_call_back("BlockFileCache::set_sleep_time",
-                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; 
});
-    sp->set_call_back("BlockFileCache::set_remove_batch",
-                      [](auto&& args) { *try_any_cast<int*>(args[0]) = 2; });
-    sp->set_call_back("BlockFileCache::recycle_deleted_blocks",
-                      [&](auto&&) { cache.get_or_set(key, 0, 5, context); });
-    sp->enable_processing();
-    ASSERT_TRUE(cache.initialize());
-    for (int i = 0; i < 100; i++) {
-        if (cache.get_async_open_success()) {
-            break;
-        };
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-    }
-    for (int64_t offset = 0; offset < 60; offset += 5) {
-        auto holder = cache.get_or_set(key, offset, 5, context);
-        auto segments = fromHolder(holder);
-        ASSERT_EQ(segments.size(), 1);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::EMPTY);
-        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
-        download(segments[0]);
-        assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
-                     io::FileBlock::State::DOWNLOADED);
-    }
-    std::this_thread::sleep_for(
-            
std::chrono::seconds(config::file_cache_ttl_valid_check_interval_second + 2));
-    config::file_cache_ttl_valid_check_interval_second = 0;
-    EXPECT_EQ(cache._cur_cache_size, 5);
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-}
-
 TEST_F(BlockFileCacheTest, reset_capacity) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
@@ -5004,8 +4933,7 @@ TEST_F(BlockFileCacheTest, reset_capacity) {
                      io::FileBlock::State::DOWNLOADED);
     }
     std::cout << cache.reset_capacity(30) << std::endl;
-    while (cache._async_clear_file_cache)
-        ;
+
     EXPECT_EQ(cache._cur_cache_size, 30);
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to