This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e74a5a18c7b [enhancement](cloud) Persist LRU information for file 
cache (#49456)
e74a5a18c7b is described below

commit e74a5a18c7bae084348339c38a1f80e9b1ee3f4e
Author: zhengyu <[email protected]>
AuthorDate: Fri Jul 4 20:33:51 2025 +0800

    [enhancement](cloud) Persist LRU information for file cache (#49456)
    
    When the system restarts, the LRU queue in memory is lost due to lack of
    persistence. This requires re-scanning the disk directory to load data,
    leading to the following issues:
    1. The loading order after restart depends on directory traversal, and
    the original eviction order cannot be preserved.
    2. If the system enters resource limit mode after restart, it may
    mistakenly delete frequently accessed hot data by users.
    
    In this commit, we periodically dump the LRU queue information to disk
    and rebuild the LRU queue upon restart. Considering that the LRU content
    may be extensive, we only dump the tail end (the part that will be
    evicted first) of the LRU queue, with the specific quantity configured
    by the config.
---
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   9 +-
 be/src/http/action/shrink_mem_action.cpp           |   2 +
 be/src/io/cache/block_file_cache.cpp               | 241 ++++++++---
 be/src/io/cache/block_file_cache.h                 | 130 ++----
 be/src/io/cache/cache_lru_dumper.cpp               | 465 +++++++++++++++++++++
 be/src/io/cache/cache_lru_dumper.h                 |  83 ++++
 be/src/io/cache/file_block.cpp                     |   8 +-
 be/src/io/cache/file_cache_common.cpp              |  57 +++
 be/src/io/cache/file_cache_common.h                |  99 +++++
 be/src/io/cache/fs_file_cache_storage.cpp          |  11 +-
 be/src/io/cache/lru_queue_recorder.cpp             | 132 ++++++
 be/src/io/cache/lru_queue_recorder.h               |  83 ++++
 be/src/runtime/exec_env.h                          |   3 +
 be/test/io/cache/block_file_cache_test.cpp         | 234 +++--------
 be/test/io/cache/block_file_cache_test_common.h    | 134 ++++++
 .../io/cache/block_file_cache_test_lru_dump.cpp    | 404 ++++++++++++++++++
 be/test/io/cache/cache_lru_dumper_test.cpp         | 149 +++++++
 be/test/io/cache/lru_queue_test.cpp                | 117 ++++++
 .../proto/file_cache.proto                         |  52 ++-
 .../doris/regression/suite/SuiteCluster.groovy     |   3 +
 .../suites/demo_p0/test_lru_persist.groovy         |  94 +++++
 22 files changed, 2175 insertions(+), 341 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 40d713ad58b..6f2c679d689 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1114,6 +1114,12 @@ 
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
 DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
 DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
+DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
+// dump queue only if the queue update specific times through several dump 
intervals
+DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
+DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
+DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
+DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");
 
 DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
 DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 85e088c2106..ab2e08f329f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1153,9 +1153,16 @@ 
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_batch);
-
 DECLARE_Int32(file_cache_downloader_thread_num_min);
 DECLARE_Int32(file_cache_downloader_thread_num_max);
+// used to persist lru information before be reboot and load the info back
+DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
+// dump queue only if the queue update specific times through several dump 
intervals
+DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
+DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
+DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
+DECLARE_mBool(enable_evaluate_shadow_queue_diff);
+
 // inverted index searcher cache
 // cache entry stay time after lookup
 DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
diff --git a/be/src/http/action/shrink_mem_action.cpp 
b/be/src/http/action/shrink_mem_action.cpp
index 66331f4a943..41f55365cff 100644
--- a/be/src/http/action/shrink_mem_action.cpp
+++ b/be/src/http/action/shrink_mem_action.cpp
@@ -36,6 +36,8 @@ void ShrinkMemAction::handle(HttpRequest* req) {
     MemoryReclamation::revoke_process_memory("ShrinkMemAction");
     LOG(INFO) << "shrink memory triggered, using Process GC Free Memory";
     HttpChannel::send_reply(req, HttpStatus::OK, "shrinking");
+
+    ExecEnv::GetInstance()->set_is_upgrading();
 }
 
 } // namespace doris
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 1e16d79dc5c..4522db10601 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -20,8 +20,13 @@
 
 #include "io/cache/block_file_cache.h"
 
+#include <cstdio>
+#include <fstream>
+
 #include "common/status.h"
 #include "cpp/sync_point.h"
+#include "gen_cpp/file_cache.pb.h"
+#include "runtime/exec_env.h"
 
 #if defined(__APPLE__)
 #include <sys/mount.h>
@@ -220,11 +225,14 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
             _cache_base_path.c_str(), 
"file_cache_storage_async_remove_latency_us");
     _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), 
"file_cache_evict_in_advance_latency_us");
-
+    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
     _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), "file_cache_recycle_keys_length");
     _ttl_gc_latency_us = 
std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
                                                                  
"file_cache_ttl_gc_latency_us");
+    _shadow_queue_levenshtein_distance = 
std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_shadow_queue_levenshtein_distance");
 
     _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
                                  cache_settings.disposable_queue_elements, 60 
* 60);
@@ -234,6 +242,9 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
                              24 * 60 * 60);
     _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, 
cache_settings.ttl_queue_elements,
                           std::numeric_limits<int>::max());
+
+    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
+    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
     if (cache_settings.storage == "memory") {
         _storage = std::make_unique<MemFileCacheStorage>();
         _cache_base_path = "memory";
@@ -250,32 +261,6 @@ UInt128Wrapper BlockFileCache::hash(const std::string& 
path) {
     return UInt128Wrapper(value);
 }
 
-std::string BlockFileCache::cache_type_to_string(FileCacheType type) {
-    switch (type) {
-    case FileCacheType::INDEX:
-        return "_idx";
-    case FileCacheType::DISPOSABLE:
-        return "_disposable";
-    case FileCacheType::NORMAL:
-        return "";
-    case FileCacheType::TTL:
-        return "_ttl";
-    }
-    return "";
-}
-
-FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) {
-    if (str == "idx") {
-        return FileCacheType::INDEX;
-    } else if (str == "disposable") {
-        return FileCacheType::DISPOSABLE;
-    } else if (str == "ttl") {
-        return FileCacheType::TTL;
-    }
-    DCHECK(false) << "The string is " << str;
-    return FileCacheType::DISPOSABLE;
-}
-
 BlockFileCache::QueryFileCacheContextHolderPtr 
BlockFileCache::get_query_context_holder(
         const TUniqueId& query_id) {
     SCOPED_CACHE_LOCK(_mutex, this);
@@ -348,12 +333,35 @@ Status BlockFileCache::initialize() {
 Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& 
cache_lock) {
     DCHECK(!_is_initialized);
     _is_initialized = true;
+    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
+        // requirements:
+        // 1. restored data should not overwrite the last dump
+        // 2. restore should happen before load and async load
+        // 3. all queues should be restored sequencially to avoid conflict
+        // TODO(zhengyu): we can parralize them but will increase complexity, 
so lets check the time cost
+        // to see if any improvement is a necessary
+        restore_lru_queues_from_disk(cache_lock);
+    }
     RETURN_IF_ERROR(_storage->init(this));
     _cache_background_monitor_thread = 
std::thread(&BlockFileCache::run_background_monitor, this);
+    pthread_setname_np(_cache_background_monitor_thread.native_handle(), 
"run_background_monitor");
     _cache_background_ttl_gc_thread = 
std::thread(&BlockFileCache::run_background_ttl_gc, this);
+    pthread_setname_np(_cache_background_ttl_gc_thread.native_handle(), 
"run_background_ttl_gc");
     _cache_background_gc_thread = 
std::thread(&BlockFileCache::run_background_gc, this);
+    pthread_setname_np(_cache_background_gc_thread.native_handle(), 
"run_background_gc");
     _cache_background_evict_in_advance_thread =
             std::thread(&BlockFileCache::run_background_evict_in_advance, 
this);
+    
pthread_setname_np(_cache_background_evict_in_advance_thread.native_handle(),
+                       "run_background_evict_in_advance");
+
+    // Initialize LRU dump thread and restore queues
+    _cache_background_lru_dump_thread = 
std::thread(&BlockFileCache::run_background_lru_dump, this);
+    pthread_setname_np(_cache_background_lru_dump_thread.native_handle(),
+                       "run_background_lru_dump");
+    _cache_background_lru_log_replay_thread =
+            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
+    pthread_setname_np(_cache_background_lru_log_replay_thread.native_handle(),
+                       "run_background_lru_log_replay");
 
     return Status::OK();
 }
@@ -369,6 +377,9 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, 
FileBlocks* result, boo
     if (cell.queue_iterator && move_iter_flag) {
         queue.move_to_end(*cell.queue_iterator, cache_lock);
     }
+    _lru_recorder->record_queue_event(cell.file_block->cache_type(), 
CacheLRULogType::MOVETOBACK,
+                                      cell.file_block->_key.hash, 
cell.file_block->_key.offset,
+                                      cell.size());
 
     cell.update_atime();
 }
@@ -442,10 +453,16 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
             if (st.ok()) {
                 auto& queue = get_queue(origin_type);
                 queue.remove(cell.queue_iterator.value(), cache_lock);
+                _lru_recorder->record_queue_event(origin_type, 
CacheLRULogType::REMOVE,
+                                                  
cell.file_block->get_hash_value(),
+                                                  cell.file_block->offset(), 
cell.size());
                 auto& ttl_queue = get_queue(FileCacheType::TTL);
                 cell.queue_iterator =
                         ttl_queue.add(cell.file_block->get_hash_value(), 
cell.file_block->offset(),
                                       cell.file_block->range().size(), 
cache_lock);
+                _lru_recorder->record_queue_event(FileCacheType::TTL, 
CacheLRULogType::ADD,
+                                                  
cell.file_block->get_hash_value(),
+                                                  cell.file_block->offset(), 
cell.size());
             } else {
                 LOG_WARNING("Failed to change key meta").error(st);
             }
@@ -482,11 +499,18 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
                     if (cell.queue_iterator) {
                         auto& ttl_queue = get_queue(FileCacheType::TTL);
                         ttl_queue.remove(cell.queue_iterator.value(), 
cache_lock);
+                        _lru_recorder->record_queue_event(FileCacheType::TTL,
+                                                          
CacheLRULogType::REMOVE,
+                                                          
cell.file_block->get_hash_value(),
+                                                          
cell.file_block->offset(), cell.size());
                     }
                     auto& queue = get_queue(FileCacheType::NORMAL);
                     cell.queue_iterator =
                             queue.add(cell.file_block->get_hash_value(), 
cell.file_block->offset(),
                                       cell.file_block->range().size(), 
cache_lock);
+                    _lru_recorder->record_queue_event(FileCacheType::NORMAL, 
CacheLRULogType::ADD,
+                                                      
cell.file_block->get_hash_value(),
+                                                      
cell.file_block->offset(), cell.size());
                 } else {
                     LOG_WARNING("Failed to change key meta").error(st);
                 }
@@ -556,6 +580,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
 
 std::string BlockFileCache::clear_file_cache_async() {
     LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
+    _lru_dumper->remove_lru_dump_files();
     int64_t num_cells_all = 0;
     int64_t num_cells_to_delete = 0;
     int64_t num_cells_wait_recycle = 0;
@@ -597,6 +622,7 @@ std::string BlockFileCache::clear_file_cache_async() {
        << " num_cells_wait_recycle=" << num_cells_wait_recycle;
     auto msg = ss.str();
     LOG(INFO) << msg;
+    _lru_dumper->remove_lru_dump_files();
     return msg;
 }
 
@@ -791,6 +817,9 @@ BlockFileCache::FileBlockCell* 
BlockFileCache::add_cell(const UInt128Wrapper& ha
 
     auto& queue = get_queue(cell.file_block->cache_type());
     cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
+    _lru_recorder->record_queue_event(cell.file_block->cache_type(), 
CacheLRULogType::ADD,
+                                      cell.file_block->get_hash_value(), 
cell.file_block->offset(),
+                                      cell.size());
 
     if (cell.file_block->cache_type() == FileCacheType::TTL) {
         if (_key_to_time.find(hash) == _key_to_time.end()) {
@@ -828,7 +857,7 @@ size_t BlockFileCache::try_release() {
     return trash.size();
 }
 
-BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
+LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
     switch (type) {
     case FileCacheType::INDEX:
         return _index_queue;
@@ -844,7 +873,7 @@ BlockFileCache::LRUQueue& 
BlockFileCache::get_queue(FileCacheType type) {
     return _normal_queue;
 }
 
-const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) 
const {
+const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
     switch (type) {
     case FileCacheType::INDEX:
         return _index_queue;
@@ -971,7 +1000,7 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& 
hash, const CacheContext&
     size_t cur_cache_size = _cur_cache_size;
     size_t query_context_cache_size = 
query_context->get_cache_size(cache_lock);
 
-    std::vector<BlockFileCache::LRUQueue::Iterator> ghost;
+    std::vector<LRUQueue::Iterator> ghost;
     std::vector<FileBlockCell*> to_evict;
 
     size_t max_size = queue.get_max_size();
@@ -1074,11 +1103,19 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const 
UInt128Wrapper& file_key, b
                     if (st.ok()) {
                         if (cell.queue_iterator) {
                             ttl_queue.remove(cell.queue_iterator.value(), 
cache_lock);
+                            _lru_recorder->record_queue_event(
+                                    FileCacheType::TTL, 
CacheLRULogType::REMOVE,
+                                    cell.file_block->get_hash_value(), 
cell.file_block->offset(),
+                                    cell.size());
                         }
                         auto& queue = get_queue(FileCacheType::NORMAL);
                         cell.queue_iterator = queue.add(
                                 cell.file_block->get_hash_value(), 
cell.file_block->offset(),
                                 cell.file_block->range().size(), cache_lock);
+                        
_lru_recorder->record_queue_event(FileCacheType::NORMAL,
+                                                          CacheLRULogType::ADD,
+                                                          
cell.file_block->get_hash_value(),
+                                                          
cell.file_block->offset(), cell.size());
                     } else {
                         LOG_WARNING("Failed to change cache type to 
normal").error(st);
                     }
@@ -1367,6 +1404,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
     if (cell->queue_iterator) {
         auto& queue = get_queue(file_block->cache_type());
         queue.remove(*cell->queue_iterator, cache_lock);
+        _lru_recorder->record_queue_event(file_block->cache_type(), 
CacheLRULogType::REMOVE,
+                                          cell->file_block->get_hash_value(),
+                                          cell->file_block->offset(), 
cell->size());
     }
     *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
             << file_block->range().size();
@@ -1482,46 +1522,34 @@ 
BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
     }
 }
 
-BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::add(
-        const UInt128Wrapper& hash, size_t offset, size_t size,
-        std::lock_guard<std::mutex>& /* cache_lock */) {
+LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, 
size_t size,
+                                 std::lock_guard<std::mutex>& /* cache_lock 
*/) {
     cache_size += size;
     auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, 
size));
     map.insert(std::make_pair(std::make_pair(hash, offset), iter));
     return iter;
 }
 
-template <class T>
-    requires IsXLock<T>
-void BlockFileCache::LRUQueue::remove(Iterator queue_it, T& /* cache_lock */) {
-    cache_size -= queue_it->size;
-    map.erase(std::make_pair(queue_it->hash, queue_it->offset));
-    queue.erase(queue_it);
-}
-
-void BlockFileCache::LRUQueue::remove_all(std::lock_guard<std::mutex>& /* 
cache_lock */) {
+void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
     queue.clear();
     map.clear();
     cache_size = 0;
 }
 
-void BlockFileCache::LRUQueue::move_to_end(Iterator queue_it,
-                                           std::lock_guard<std::mutex>& /* 
cache_lock */) {
+void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* 
cache_lock */) {
     queue.splice(queue.end(), queue, queue_it);
 }
-bool BlockFileCache::LRUQueue::contains(const UInt128Wrapper& hash, size_t 
offset,
-                                        std::lock_guard<std::mutex>& /* 
cache_lock */) const {
+bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
+                        std::lock_guard<std::mutex>& /* cache_lock */) const {
     return map.find(std::make_pair(hash, offset)) != map.end();
 }
 
-BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::get(
-        const UInt128Wrapper& hash, size_t offset,
-        std::lock_guard<std::mutex>& /* cache_lock */) const {
+LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
+                                 std::lock_guard<std::mutex>& /* cache_lock 
*/) const {
     return map.find(std::make_pair(hash, offset))->second;
 }
 
-std::string BlockFileCache::LRUQueue::to_string(
-        std::lock_guard<std::mutex>& /* cache_lock */) const {
+std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) 
const {
     std::string result;
     for (const auto& [hash, offset, size] : queue) {
         if (!result.empty()) {
@@ -1532,6 +1560,48 @@ std::string BlockFileCache::LRUQueue::to_string(
     return result;
 }
 
+size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
+                                           std::lock_guard<std::mutex>& 
cache_lock) {
+    std::list<FileKeyAndOffset> target_queue = this->queue;
+    std::list<FileKeyAndOffset> base_queue = base.queue;
+    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), 
target_queue.end());
+    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
+
+    size_t m = vec1.size();
+    size_t n = vec2.size();
+
+    // Create a 2D vector (matrix) to store the Levenshtein distances
+    // dp[i][j] will hold the distance between the first i elements of vec1 
and the first j elements of vec2
+    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
+
+    // Initialize the first row and column of the matrix
+    // The distance between an empty list and a list of length k is k (all 
insertions or deletions)
+    for (size_t i = 0; i <= m; ++i) {
+        dp[i][0] = i;
+    }
+    for (size_t j = 0; j <= n; ++j) {
+        dp[0][j] = j;
+    }
+
+    // Fill the matrix using dynamic programming
+    for (size_t i = 1; i <= m; ++i) {
+        for (size_t j = 1; j <= n; ++j) {
+            // Check if the current elements of both vectors are equal
+            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
+                           vec1[i - 1].offset == vec2[j - 1].offset)
+                                  ? 0
+                                  : 1;
+            // Calculate the minimum cost of three possible operations:
+            // 1. Insertion: dp[i][j-1] + 1
+            // 2. Deletion: dp[i-1][j] + 1
+            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 
1 if not)
+            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 
1][j - 1] + cost});
+        }
+    }
+    // The bottom-right cell of the matrix contains the Levenshtein distance
+    return dp[m][n];
+}
+
 std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
     SCOPED_CACHE_LOCK(_mutex, this);
     return dump_structure_unlocked(hash, cache_lock);
@@ -1583,9 +1653,15 @@ void BlockFileCache::change_cache_type(const 
UInt128Wrapper& hash, size_t offset
             auto& cur_queue = get_queue(cell.file_block->cache_type());
             DCHECK(cell.queue_iterator.has_value());
             cur_queue.remove(*cell.queue_iterator, cache_lock);
+            _lru_recorder->record_queue_event(
+                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
+                    cell.file_block->get_hash_value(), 
cell.file_block->offset(), cell.size());
             auto& new_queue = get_queue(new_type);
             cell.queue_iterator =
                     new_queue.add(hash, offset, 
cell.file_block->range().size(), cache_lock);
+            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
+                                              
cell.file_block->get_hash_value(),
+                                              cell.file_block->offset(), 
cell.size());
         }
     }
 }
@@ -1982,9 +2058,15 @@ void BlockFileCache::modify_expiration_time(const 
UInt128Wrapper& hash,
             if (st.ok()) {
                 auto& queue = get_queue(origin_type);
                 queue.remove(cell.queue_iterator.value(), cache_lock);
+                _lru_recorder->record_queue_event(origin_type, 
CacheLRULogType::REMOVE,
+                                                  
cell.file_block->get_hash_value(),
+                                                  cell.file_block->offset(), 
cell.size());
                 auto& ttl_queue = get_queue(FileCacheType::TTL);
                 cell.queue_iterator = ttl_queue.add(hash, 
cell.file_block->offset(),
                                                     
cell.file_block->range().size(), cache_lock);
+                _lru_recorder->record_queue_event(FileCacheType::TTL, 
CacheLRULogType::ADD,
+                                                  
cell.file_block->get_hash_value(),
+                                                  cell.file_block->offset(), 
cell.size());
             }
             if (!st.ok()) {
                 LOG_WARNING("").error(st);
@@ -2066,6 +2148,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t 
size,
 }
 
 std::string BlockFileCache::clear_file_cache_directly() {
+    _lru_dumper->remove_lru_dump_files();
     using namespace std::chrono;
     std::stringstream ss;
     auto start = steady_clock::now();
@@ -2113,9 +2196,9 @@ std::string BlockFileCache::clear_file_cache_directly() {
        << " cache_size=" << cache_size << " index_queue_size=" << 
index_queue_size
        << " normal_queue_size=" << normal_queue_size
        << " disposible_queue_size=" << disposible_queue_size << 
"ttl_queue_size=" << ttl_queue_size;
-
     auto msg = ss.str();
     LOG(INFO) << msg;
+    _lru_dumper->remove_lru_dump_files();
     return msg;
 }
 
@@ -2144,6 +2227,60 @@ void BlockFileCache::update_ttl_atime(const 
UInt128Wrapper& hash) {
     };
 }
 
+void BlockFileCache::run_background_lru_log_replay() {
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_lru_log_replay_interval_ms;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        _lru_recorder->replay_queue_event(FileCacheType::TTL);
+        _lru_recorder->replay_queue_event(FileCacheType::INDEX);
+        _lru_recorder->replay_queue_event(FileCacheType::NORMAL);
+        _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE);
+
+        if (config::enable_evaluate_shadow_queue_diff) {
+            SCOPED_CACHE_LOCK(_mutex, this);
+            _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
+            _lru_recorder->evaluate_queue_diff(_index_queue, "index", 
cache_lock);
+            _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", 
cache_lock);
+            _lru_recorder->evaluate_queue_diff(_disposable_queue, 
"disposable", cache_lock);
+        }
+    }
+}
+
+void BlockFileCache::run_background_lru_dump() {
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_lru_dump_interval_ms;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
+            !ExecEnv::GetInstance()->get_is_upgrading()) {
+            _lru_dumper->dump_queue("disposable");
+            _lru_dumper->dump_queue("normal");
+            _lru_dumper->dump_queue("index");
+            _lru_dumper->dump_queue("ttl");
+        }
+    }
+}
+
+void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& 
cache_lock) {
+    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
+    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
+    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
+    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
+}
+
 std::map<std::string, double> BlockFileCache::get_stats() {
     std::map<std::string, double> stats;
     stats["hits_ratio"] = (double)_hit_ratio->get_value();
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index 7c046cc1627..eaf62c1a82c 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -26,9 +26,11 @@
 #include <optional>
 #include <thread>
 
+#include "io/cache/cache_lru_dumper.h"
 #include "io/cache/file_block.h"
 #include "io/cache/file_cache_common.h"
 #include "io/cache/file_cache_storage.h"
+#include "io/cache/lru_queue_recorder.h"
 #include "util/runtime_profile.h"
 #include "util/threadpool.h"
 
@@ -72,10 +74,6 @@ private:
 #define SCOPED_CACHE_LOCK(MUTEX, cache) std::lock_guard cache_lock(MUTEX);
 #endif
 
-template <class Lock>
-concept IsXLock = std::same_as<Lock, std::lock_guard<std::mutex>> ||
-                  std::same_as<Lock, std::unique_lock<std::mutex>>;
-
 class FSFileCacheStorage;
 
 // The BlockFileCache is responsible for the management of the blocks
@@ -85,16 +83,16 @@ class BlockFileCache {
     friend class MemFileCacheStorage;
     friend class FileBlock;
     friend struct FileBlocksHolder;
+    friend class CacheLRUDumper;
+    friend class LRUQueueRecorder;
 
 public:
-    static std::string cache_type_to_string(FileCacheType type);
-    static FileCacheType string_to_cache_type(const std::string& str);
     // hash the file_name to uint128
     static UInt128Wrapper hash(const std::string& path);
 
     BlockFileCache(const std::string& cache_base_path, const 
FileCacheSettings& cache_settings);
 
-    ~BlockFileCache() {
+    virtual ~BlockFileCache() {
         {
             std::lock_guard lock(_close_mtx);
             _close = true;
@@ -112,6 +110,12 @@ public:
         if (_cache_background_evict_in_advance_thread.joinable()) {
             _cache_background_evict_in_advance_thread.join();
         }
+        if (_cache_background_lru_dump_thread.joinable()) {
+            _cache_background_lru_dump_thread.join();
+        }
+        if (_cache_background_lru_log_replay_thread.joinable()) {
+            _cache_background_lru_log_replay_thread.join();
+        }
     }
 
     /// Restore cache from local filesystem.
@@ -216,86 +220,6 @@ public:
     // for be UTs
     std::map<std::string, double> get_stats_unsafe();
 
-    class LRUQueue {
-    public:
-        LRUQueue() = default;
-        LRUQueue(size_t max_size, size_t max_element_size, int64_t 
hot_data_interval)
-                : max_size(max_size),
-                  max_element_size(max_element_size),
-                  hot_data_interval(hot_data_interval) {}
-
-        struct HashFileKeyAndOffset {
-            std::size_t operator()(const std::pair<UInt128Wrapper, size_t>& 
pair) const {
-                return KeyHash()(pair.first) + pair.second;
-            }
-        };
-
-        struct FileKeyAndOffset {
-            UInt128Wrapper hash;
-            size_t offset;
-            size_t size;
-
-            FileKeyAndOffset(const UInt128Wrapper& hash, size_t offset, size_t 
size)
-                    : hash(hash), offset(offset), size(size) {}
-        };
-
-        using Iterator = typename std::list<FileKeyAndOffset>::iterator;
-
-        size_t get_max_size() const { return max_size; }
-        size_t get_max_element_size() const { return max_element_size; }
-
-        template <class T>
-            requires IsXLock<T>
-        size_t get_capacity(T& /* cache_lock */) const {
-            return cache_size;
-        }
-
-        size_t get_capacity_unsafe() const { return cache_size; }
-
-        size_t get_elements_num_unsafe() const { return queue.size(); }
-
-        size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */) 
const {
-            return queue.size();
-        }
-
-        Iterator add(const UInt128Wrapper& hash, size_t offset, size_t size,
-                     std::lock_guard<std::mutex>& cache_lock);
-        template <class T>
-            requires IsXLock<T>
-        void remove(Iterator queue_it, T& cache_lock);
-
-        void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& 
cache_lock);
-
-        std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;
-
-        bool contains(const UInt128Wrapper& hash, size_t offset,
-                      std::lock_guard<std::mutex>& cache_lock) const;
-
-        Iterator begin() { return queue.begin(); }
-
-        Iterator end() { return queue.end(); }
-
-        void remove_all(std::lock_guard<std::mutex>& cache_lock);
-
-        Iterator get(const UInt128Wrapper& hash, size_t offset,
-                     std::lock_guard<std::mutex>& /* cache_lock */) const;
-
-        int64_t get_hot_data_interval() const { return hot_data_interval; }
-
-        void clear(std::lock_guard<std::mutex>& cache_lock) {
-            queue.clear();
-            map.clear();
-            cache_size = 0;
-        }
-
-        size_t max_size;
-        size_t max_element_size;
-        std::list<FileKeyAndOffset> queue;
-        std::unordered_map<std::pair<UInt128Wrapper, size_t>, Iterator, 
HashFileKeyAndOffset> map;
-        size_t cache_size = 0;
-        int64_t hot_data_interval {0};
-    };
-
     using AccessRecord =
             std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, 
KeyAndOffsetHash>;
 
@@ -389,8 +313,8 @@ private:
         FileBlockCell(const FileBlockCell&) = delete;
     };
 
-    BlockFileCache::LRUQueue& get_queue(FileCacheType type);
-    const BlockFileCache::LRUQueue& get_queue(FileCacheType type) const;
+    LRUQueue& get_queue(FileCacheType type);
+    const LRUQueue& get_queue(FileCacheType type) const;
 
     template <class T, class U>
         requires IsXLock<T> && IsXLock<U>
@@ -403,9 +327,9 @@ private:
         requires IsXLock<T>
     FileBlockCell* get_cell(const UInt128Wrapper& hash, size_t offset, T& 
cache_lock);
 
-    FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& 
context, size_t offset,
-                            size_t size, FileBlock::State state,
-                            std::lock_guard<std::mutex>& cache_lock);
+    virtual FileBlockCell* add_cell(const UInt128Wrapper& hash, const 
CacheContext& context,
+                                    size_t offset, size_t size, 
FileBlock::State state,
+                                    std::lock_guard<std::mutex>& cache_lock);
 
     Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);
 
@@ -463,6 +387,9 @@ private:
     void run_background_monitor();
     void run_background_ttl_gc();
     void run_background_gc();
+    void run_background_lru_log_replay();
+    void run_background_lru_dump();
+    void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
     void run_background_evict_in_advance();
 
     bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
@@ -489,6 +416,17 @@ private:
                                std::lock_guard<std::mutex>& cache_lock, 
size_t& cur_removed_size,
                                bool evict_in_advance);
 
+    Status check_ofstream_status(std::ofstream& out, std::string& filename);
+    Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const 
UInt128Wrapper& hash,
+                              size_t offset, size_t size);
+    Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& 
tmp_filename,
+                         std::string& final_filename, size_t& file_size);
+    Status check_ifstream_status(std::ifstream& in, std::string& filename);
+    Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& 
entry_num);
+    Status parse_one_lru_entry(std::ifstream& in, std::string& filename, 
UInt128Wrapper& hash,
+                               size_t& offset, size_t& size);
+    void remove_lru_dump_files();
+
     // info
     std::string _cache_base_path;
     size_t _capacity = 0;
@@ -503,6 +441,8 @@ private:
     std::thread _cache_background_ttl_gc_thread;
     std::thread _cache_background_gc_thread;
     std::thread _cache_background_evict_in_advance_thread;
+    std::thread _cache_background_lru_dump_thread;
+    std::thread _cache_background_lru_log_replay_thread;
     std::atomic_bool _async_open_done {false};
     // disk space or inode is less than the specified value
     bool _disk_resource_limit_mode {false};
@@ -532,6 +472,9 @@ private:
     // keys for async remove
     RecycleFileCacheKeys _recycle_keys;
 
+    std::unique_ptr<LRUQueueRecorder> _lru_recorder;
+    std::unique_ptr<CacheLRUDumper> _lru_dumper;
+
     // metrics
     std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics;
     std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics;
@@ -576,11 +519,14 @@ private:
     std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
     std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
+
+    std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
     // keep _storage last so it will deconstruct first
     // otherwise, load_cache_info_into_memory might crash
     // coz it will use other members of BlockFileCache
     // so join this async load thread first
     std::unique_ptr<FileCacheStorage> _storage;
+    std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/cache/cache_lru_dumper.cpp 
b/be/src/io/cache/cache_lru_dumper.cpp
new file mode 100644
index 00000000000..78632eca754
--- /dev/null
+++ b/be/src/io/cache/cache_lru_dumper.cpp
@@ -0,0 +1,465 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+#include "io/cache/lru_queue_recorder.h"
+#include "util/crc32c.h"
+
+namespace doris::io {
+
+std::string CacheLRUDumper::Footer::serialize_as_string() const {
+    std::string result;
+    result.reserve(sizeof(Footer));
+
+    // Serialize meta_offset (convert to little-endian)
+    uint64_t meta_offset_le = htole64(meta_offset);
+    result.append(reinterpret_cast<const char*>(&meta_offset_le), 
sizeof(meta_offset_le));
+
+    // Serialize checksum (convert to little-endian)
+    uint32_t checksum_le = htole32(checksum);
+    result.append(reinterpret_cast<const char*>(&checksum_le), 
sizeof(checksum_le));
+
+    result.append(reinterpret_cast<const char*>(&version), sizeof(version));
+
+    // Serialize magic
+    result.append(magic, sizeof(magic));
+
+    return result;
+}
+
+bool CacheLRUDumper::Footer::deserialize_from_string(const std::string& data) {
+    DCHECK(data.size() == sizeof(Footer));
+
+    const char* ptr = data.data();
+
+    // Deserialize meta_offset (convert from little-endian)
+    uint64_t meta_offset_le;
+    std::memcpy(&meta_offset_le, ptr, sizeof(meta_offset_le));
+    meta_offset = le64toh(meta_offset_le);
+    ptr += sizeof(meta_offset_le);
+
+    // Deserialize checksum (convert from little-endian)
+    uint32_t checksum_le;
+    std::memcpy(&checksum_le, ptr, sizeof(checksum_le));
+    checksum = le32toh(checksum_le);
+    ptr += sizeof(checksum_le);
+
+    version = *((uint8_t*)ptr);
+    ptr += sizeof(version);
+
+    // Deserialize magic
+    std::memcpy(magic, ptr, sizeof(magic));
+
+    return true;
+}
+
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (List<offset,size,crc>)         |
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)|MAGIC (3B)|
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    VLOG_DEBUG << "Serialized size: " << serialized.size()
+               << " Before serialization: " << 
_current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        std::string warn_msg = fmt::format("Failed to serialize 
LRUDumpEntryGroupPb");
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+    uint32_t checksum = crc32c::Value(serialized.data(), serialized.size());
+    group_info->set_checksum(checksum);
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    std::string footer_str = footer.serialize_as_string();
+    out.write(footer_str.data(), footer_str.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    try {
+        std::rename(tmp_filename.c_str(), final_filename.c_str());
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    } catch (const std::filesystem::filesystem_error& e) {
+        LOG(WARNING) << "failed to rename " << tmp_filename << " to " << 
final_filename
+                     << " err: " << e.what();
+    }
+
+    _dump_meta.Clear();
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+
+    return Status::OK();
+}
+
+void CacheLRUDumper::dump_queue(const std::string& queue_name) {
+    FileCacheType type = string_to_cache_type(queue_name);
+    if (_recorder->get_lru_queue_update_cnt_from_last_dump(type) >
+        config::file_cache_background_lru_dump_update_cnt_threshold) {
+        LRUQueue& queue = _recorder->get_shadow_queue(type);
+        do_dump_queue(queue, queue_name);
+        _recorder->reset_lru_queue_update_cnt_from_last_dump(type);
+    }
+}
+
+void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& 
queue_name) {
+    Status st;
+    std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+    elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+    {
+        std::lock_guard<std::mutex> lru_log_lock(_recorder->_mutex_lru_log);
+        size_t count = 0;
+        for (const auto& [hash, offset, size] : queue) {
+            if (count++ >= 
config::file_cache_background_lru_dump_tail_record_num) break;
+            elements.emplace_back(hash, offset, size);
+        }
+    }
+
+    // Write to disk
+    int64_t duration_ns = 0;
+    std::uintmax_t file_size = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        std::string tmp_filename =
+                fmt::format("{}/lru_dump_{}.tail.tmp", _mgr->_cache_base_path, 
queue_name);
+        std::string final_filename =
+                fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, 
queue_name);
+        std::ofstream out(tmp_filename, std::ios::binary);
+        if (out) {
+            LOG(INFO) << "begin dump " << queue_name << " with " << 
elements.size() << " elements";
+            for (const auto& [hash, offset, size] : elements) {
+                RETURN_IF_STATUS_ERROR(st,
+                                       dump_one_lru_entry(out, tmp_filename, 
hash, offset, size));
+            }
+            RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), 
tmp_filename,
+                                                     final_filename, 
file_size));
+        } else {
+            LOG(WARNING) << "open lru dump file failed, reason: " << 
tmp_filename
+                         << " failed to create";
+        }
+    }
+    *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000);
+    LOG(INFO) << fmt::format("lru dump for {} size={} element={} time={}us", 
queue_name, file_size,
+                             elements.size(), duration_ns / 1000);
+};
+
+Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& 
filename,
+                                         size_t& entry_num) {
+    size_t file_size = std::filesystem::file_size(filename);
+
+    // Read footer
+    Footer footer;
+    size_t footer_size = sizeof(footer);
+    if (file_size < footer_size) {
+        std::string warn_msg = std::string(fmt::format(
+                "LRU dump file too small to contain footer, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    in.seekg(-footer_size, std::ios::end);
+    std::string footer_str(footer_size, '\0');
+    in.read(&footer_str[0], footer_size);
+    RETURN_IF_ERROR(check_ifstream_status(in, filename));
+
+    if (!footer.deserialize_from_string(footer_str)) {
+        std::string warn_msg = std::string(
+                fmt::format("Failed to deserialize footer, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    // Convert from little-endian to host byte order
+    footer.meta_offset = le64toh(footer.meta_offset);
+
+    // Validate footer
+    if (footer.version != 1 || std::string(footer.magic, 3) != "DOR") {
+        std::string warn_msg = std::string(fmt::format(
+                "LRU dump file invalid footer format, file={}, skip restore", 
filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    // Read meta
+    in.seekg(footer.meta_offset, std::ios::beg);
+    size_t meta_size = file_size - footer.meta_offset - footer_size;
+    if (meta_size <= 0) {
+        std::string warn_msg = std::string(
+                fmt::format("LRU dump file invalid meta size, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+    std::string meta_serialized(meta_size, '\0');
+    in.read(&meta_serialized[0], meta_serialized.size());
+    RETURN_IF_ERROR(check_ifstream_status(in, filename));
+    _parse_meta.Clear();
+    _current_parse_group.Clear();
+    if (!_parse_meta.ParseFromString(meta_serialized)) {
+        std::string warn_msg = std::string(
+                fmt::format("LRU dump file meta parse failed, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+    VLOG_DEBUG << "parse meta: " << _parse_meta.DebugString();
+
+    entry_num = _parse_meta.entry_num();
+    return Status::OK();
+}
+
+Status CacheLRUDumper::parse_one_lru_entry(std::ifstream& in, std::string& 
filename,
+                                           UInt128Wrapper& hash, size_t& 
offset, size_t& size) {
+    // Read next group if current is empty
+    if (_current_parse_group.entries_size() == 0) {
+        if (_parse_meta.group_offset_size_size() == 0) {
+            return Status::EndOfFile("No more entries");
+        }
+
+        auto group_info = _parse_meta.group_offset_size(0);
+        in.seekg(group_info.offset(), std::ios::beg);
+        std::string group_serialized(group_info.size(), '\0');
+        in.read(&group_serialized[0], group_serialized.size());
+        RETURN_IF_ERROR(check_ifstream_status(in, filename));
+        uint32_t checksum = crc32c::Value(group_serialized.data(), 
group_serialized.size());
+        if (checksum != group_info.checksum()) {
+            std::string warn_msg =
+                    fmt::format("restore lru failed as checksum not match, 
file={}", filename);
+            LOG(WARNING) << warn_msg;
+            return Status::InternalError(warn_msg);
+        }
+        if (!_current_parse_group.ParseFromString(group_serialized)) {
+            std::string warn_msg =
+                    fmt::format("restore lru failed to parse group, file={}", 
filename);
+            LOG(WARNING) << warn_msg;
+            return Status::InternalError(warn_msg);
+        }
+
+        // Remove processed group info
+        
_parse_meta.mutable_group_offset_size()->erase(_parse_meta.group_offset_size().begin());
+    }
+
+    // Get next entry from current group
+    VLOG_DEBUG << "After deserialization: " << 
_current_parse_group.DebugString();
+    auto entry = _current_parse_group.entries(0);
+    hash = UInt128Wrapper((static_cast<uint128_t>(entry.hash().high()) << 64) 
| entry.hash().low());
+    offset = entry.offset();
+    size = entry.size();
+
+    // Remove processed entry
+    
_current_parse_group.mutable_entries()->erase(_current_parse_group.entries().begin());
+    return Status::OK();
+}
+
+void CacheLRUDumper::restore_queue(LRUQueue& queue, const std::string& 
queue_name,
+                                   std::lock_guard<std::mutex>& cache_lock) {
+    Status st;
+    std::string filename = fmt::format("{}/lru_dump_{}.tail", 
_mgr->_cache_base_path, queue_name);
+    std::ifstream in(filename, std::ios::binary);
+    int64_t duration_ns = 0;
+    if (in) {
+        LOG(INFO) << "lru dump file is founded for " << queue_name << ". 
starting lru restore.";
+
+        SCOPED_RAW_TIMER(&duration_ns);
+        size_t entry_num = 0;
+        RETURN_IF_STATUS_ERROR(st, parse_dump_footer(in, filename, entry_num));
+        LOG(INFO) << "lru dump file for " << queue_name << " has " << 
entry_num << " entries.";
+        in.seekg(0, std::ios::beg);
+        UInt128Wrapper hash;
+        size_t offset, size;
+        for (int i = 0; i < entry_num; ++i) {
+            RETURN_IF_STATUS_ERROR(st, parse_one_lru_entry(in, filename, hash, 
offset, size));
+            CacheContext ctx;
+            if (queue_name == "ttl") {
+                ctx.cache_type = FileCacheType::TTL;
+                // TODO(zhengyu): we haven't persist expiration time yet, use 
3h default
+                // There are mulitiple places we can correct this fake 3h ttl, 
e.g.:
+                // 1. during load_cache_info_into_memory (this will cause 
overwriting the ttl of async load)
+                // 2. after restoring, use sync_meta to modify the ttl
+                // However, I plan not to do this in this commit but to figure 
a more elegant way
+                // after ttl expiration time being changed from file name 
encoding to rocksdb persistency.
+                ctx.expiration_time = 10800;
+            } else if (queue_name == "index") {
+                ctx.cache_type = FileCacheType::INDEX;
+            } else if (queue_name == "normal") {
+                ctx.cache_type = FileCacheType::NORMAL;
+            } else if (queue_name == "disposable") {
+                ctx.cache_type = FileCacheType::DISPOSABLE;
+            } else {
+                LOG_WARNING("unknown queue type for lru restore, skip");
+                DCHECK(false);
+                return;
+            }
+            // TODO(zhengyu): we don't use stats yet, see if this will cause 
any problem
+            _mgr->add_cell(hash, ctx, offset, size, 
FileBlock::State::DOWNLOADED, cache_lock);
+        }
+        in.close();
+    } else {
+        LOG(INFO) << "no lru dump file is founded for " << queue_name;
+    }
+    LOG(INFO) << "lru restore time costs: " << (duration_ns / 1000) << "us.";
+};
+
+void CacheLRUDumper::remove_lru_dump_files() {
+    std::vector<std::string> queue_names = {"disposable", "index", "normal", 
"ttl"};
+    for (const auto& queue_name : queue_names) {
+        std::string filename =
+                fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, 
queue_name);
+        if (std::filesystem::exists(filename)) {
+            std::filesystem::remove(filename);
+        }
+    }
+}
+
+} // end of namespace doris::io
diff --git a/be/src/io/cache/cache_lru_dumper.h 
b/be/src/io/cache/cache_lru_dumper.h
new file mode 100644
index 00000000000..801ed577de2
--- /dev/null
+++ b/be/src/io/cache/cache_lru_dumper.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "gen_cpp/file_cache.pb.h"
+#include "io/cache/file_cache_common.h"
+
+namespace doris::io {
+class LRUQueue;
+class LRUQueueRecorder;
+
+class CacheLRUDumper {
+public:
+    CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder)
+            : _mgr(mgr), _recorder(recorder) {};
+    void dump_queue(const std::string& queue_name);
+    void restore_queue(LRUQueue& queue, const std::string& queue_name,
+                       std::lock_guard<std::mutex>& cache_lock);
+    void remove_lru_dump_files();
+
+private:
+    void do_dump_queue(LRUQueue& queue, const std::string& queue_name);
+    Status check_ofstream_status(std::ofstream& out, std::string& filename);
+    Status check_ifstream_status(std::ifstream& in, std::string& filename);
+    Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const 
UInt128Wrapper& hash,
+                              size_t offset, size_t size);
+    Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& 
tmp_filename,
+                         std::string& final_filename, size_t& file_size);
+    Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& 
entry_num);
+    Status parse_one_lru_entry(std::ifstream& in, std::string& filename, 
UInt128Wrapper& hash,
+                               size_t& offset, size_t& size);
+    Status flush_current_group(std::ofstream& out, std::string& filename);
+
+    struct Footer {
+        size_t meta_offset;
+        uint32_t checksum;
+        uint8_t version;
+        char magic[3];
+
+        std::string serialize_as_string() const;
+        bool deserialize_from_string(const std::string& data);
+    } __attribute__((packed));
+
+private:
+    // For dumping
+    doris::io::cache::LRUDumpEntryGroupPb _current_dump_group;
+    doris::io::cache::LRUDumpMetaPb _dump_meta;
+    size_t _current_dump_group_count = 0;
+
+    // For parsing
+    doris::io::cache::LRUDumpEntryGroupPb _current_parse_group;
+    doris::io::cache::LRUDumpMetaPb _parse_meta;
+
+    BlockFileCache* _mgr;
+    LRUQueueRecorder* _recorder;
+};
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 06f58730296..731c1d311e6 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -172,8 +172,8 @@ Status 
FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_typ
     if (!expr) {
         LOG(WARNING) << "none of the cache type is TTL"
                      << ", hash: " << _key.hash.to_string() << ", offset: " << 
_key.offset
-                     << ", new type: " << 
BlockFileCache::cache_type_to_string(new_type)
-                     << ", old type: " << 
BlockFileCache::cache_type_to_string(_key.meta.type);
+                     << ", new type: " << cache_type_to_string(new_type)
+                     << ", old type: " << cache_type_to_string(_key.meta.type);
     }
     DCHECK(expr);
 
@@ -189,8 +189,8 @@ Status 
FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_t
     if (!expr) {
         LOG(WARNING) << "one of the cache type is TTL"
                      << ", hash: " << _key.hash.to_string() << ", offset: " << 
_key.offset
-                     << ", new type: " << 
BlockFileCache::cache_type_to_string(new_type)
-                     << ", old type: " << 
BlockFileCache::cache_type_to_string(_key.meta.type);
+                     << ", new type: " << cache_type_to_string(new_type)
+                     << ", old type: " << cache_type_to_string(_key.meta.type);
     }
     DCHECK(expr);
     if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) {
diff --git a/be/src/io/cache/file_cache_common.cpp 
b/be/src/io/cache/file_cache_common.cpp
index 56525425f75..64faec3beb3 100644
--- a/be/src/io/cache/file_cache_common.cpp
+++ b/be/src/io/cache/file_cache_common.cpp
@@ -26,6 +26,60 @@
 
 namespace doris::io {
 
+std::string cache_type_to_surfix(FileCacheType type) {
+    switch (type) {
+    case FileCacheType::INDEX:
+        return "_idx";
+    case FileCacheType::DISPOSABLE:
+        return "_disposable";
+    case FileCacheType::NORMAL:
+        return "";
+    case FileCacheType::TTL:
+        return "_ttl";
+    }
+    return "";
+}
+
+FileCacheType surfix_to_cache_type(const std::string& str) {
+    if (str == "idx") {
+        return FileCacheType::INDEX;
+    } else if (str == "disposable") {
+        return FileCacheType::DISPOSABLE;
+    } else if (str == "ttl") {
+        return FileCacheType::TTL;
+    }
+    DCHECK(false) << "The string is " << str;
+    return FileCacheType::DISPOSABLE;
+}
+
+FileCacheType string_to_cache_type(const std::string& str) {
+    if (str == "normal") {
+        return FileCacheType::NORMAL;
+    } else if (str == "index") {
+        return FileCacheType::INDEX;
+    } else if (str == "disposable") {
+        return FileCacheType::DISPOSABLE;
+    } else if (str == "ttl") {
+        return FileCacheType::TTL;
+    }
+    DCHECK(false) << "The string is " << str;
+    return FileCacheType::NORMAL;
+}
+std::string cache_type_to_string(FileCacheType type) {
+    switch (type) {
+    case FileCacheType::INDEX:
+        return "index";
+    case FileCacheType::DISPOSABLE:
+        return "disposable";
+    case FileCacheType::NORMAL:
+        return "normal";
+    case FileCacheType::TTL:
+        return "ttl";
+    }
+    DCHECK(false) << "unknown type: " << type;
+    return "normal";
+}
+
 std::string FileCacheSettings::to_string() const {
     std::stringstream ss;
     ss << "capacity: " << capacity << ", max_file_block_size: " << 
max_file_block_size
@@ -89,4 +143,7 @@ FileBlocksHolderPtr 
FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs
     return std::make_unique<FileBlocksHolder>(std::move(holder));
 }
 
+template size_t LRUQueue::get_capacity(std::lock_guard<std::mutex>& 
cache_lock) const;
+template void LRUQueue::remove(Iterator queue_it, std::lock_guard<std::mutex>& 
cache_lock);
+
 } // namespace doris::io
diff --git a/be/src/io/cache/file_cache_common.h 
b/be/src/io/cache/file_cache_common.h
index 6e9396fb11a..f9ac525d0be 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -40,6 +40,12 @@ enum FileCacheType {
     TTL = 3,
 };
 
+std::string cache_type_to_surfix(FileCacheType type);
+FileCacheType surfix_to_cache_type(const std::string& str);
+
+FileCacheType string_to_cache_type(const std::string& str);
+std::string cache_type_to_string(FileCacheType type);
+
 struct UInt128Wrapper {
     uint128_t value_;
     [[nodiscard]] std::string to_string() const;
@@ -48,6 +54,9 @@ struct UInt128Wrapper {
     explicit UInt128Wrapper(const uint128_t& value) : value_(value) {}
 
     bool operator==(const UInt128Wrapper& other) const { return value_ == 
other.value_; }
+
+    uint64_t high() const { return static_cast<uint64_t>(value_ >> 64); }
+    uint64_t low() const { return static_cast<uint64_t>(value_); }
 };
 
 struct ReadStatistics {
@@ -152,4 +161,94 @@ struct CacheContext {
     ReadStatistics* stats;
 };
 
+template <class Lock>
+concept IsXLock = std::same_as<Lock, std::lock_guard<std::mutex>> ||
+                  std::same_as<Lock, std::unique_lock<std::mutex>>;
+
+class LRUQueue {
+public:
+    LRUQueue() = default;
+    LRUQueue(size_t max_size, size_t max_element_size, int64_t 
hot_data_interval)
+            : max_size(max_size),
+              max_element_size(max_element_size),
+              hot_data_interval(hot_data_interval) {}
+
+    struct HashFileKeyAndOffset {
+        std::size_t operator()(const std::pair<UInt128Wrapper, size_t>& pair) 
const {
+            return KeyHash()(pair.first) + pair.second;
+        }
+    };
+
+    struct FileKeyAndOffset {
+        UInt128Wrapper hash;
+        size_t offset;
+        size_t size;
+
+        FileKeyAndOffset(const UInt128Wrapper& hash, size_t offset, size_t 
size)
+                : hash(hash), offset(offset), size(size) {}
+    };
+
+    using Iterator = typename std::list<FileKeyAndOffset>::iterator;
+
+    size_t get_max_size() const { return max_size; }
+    size_t get_max_element_size() const { return max_element_size; }
+
+    template <class T>
+        requires IsXLock<T>
+    size_t get_capacity(T& /* cache_lock */) const {
+        return cache_size;
+    }
+
+    size_t get_capacity_unsafe() const { return cache_size; }
+
+    size_t get_elements_num_unsafe() const { return queue.size(); }
+
+    size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */) 
const {
+        return queue.size();
+    }
+
+    Iterator add(const UInt128Wrapper& hash, size_t offset, size_t size,
+                 std::lock_guard<std::mutex>& cache_lock);
+    template <class T>
+        requires IsXLock<T>
+    void remove(Iterator queue_it, T& /* cache_lock */) {
+        cache_size -= queue_it->size;
+        map.erase(std::make_pair(queue_it->hash, queue_it->offset));
+        queue.erase(queue_it);
+    }
+
+    void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& 
cache_lock);
+
+    std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;
+
+    bool contains(const UInt128Wrapper& hash, size_t offset,
+                  std::lock_guard<std::mutex>& cache_lock) const;
+
+    Iterator begin() { return queue.begin(); }
+
+    Iterator end() { return queue.end(); }
+
+    void remove_all(std::lock_guard<std::mutex>& cache_lock);
+
+    Iterator get(const UInt128Wrapper& hash, size_t offset,
+                 std::lock_guard<std::mutex>& /* cache_lock */) const;
+
+    int64_t get_hot_data_interval() const { return hot_data_interval; }
+
+    void clear(std::lock_guard<std::mutex>& cache_lock) {
+        queue.clear();
+        map.clear();
+        cache_size = 0;
+    }
+
+    size_t levenshtein_distance_from(LRUQueue& base, 
std::lock_guard<std::mutex>& cache_lock);
+
+    size_t max_size;
+    size_t max_element_size;
+    std::list<FileKeyAndOffset> queue;
+    std::unordered_map<std::pair<UInt128Wrapper, size_t>, Iterator, 
HashFileKeyAndOffset> map;
+    size_t cache_size = 0;
+    int64_t hot_data_interval {0};
+};
+
 } // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp 
b/be/src/io/cache/fs_file_cache_storage.cpp
index ecb594e14a2..bb24b476501 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -254,8 +254,8 @@ Status FSFileCacheStorage::change_key_meta_type(const 
FileCacheKey& key, const F
         if (!expr) {
             LOG(WARNING) << "TTL type file dose not need to change the suffix"
                          << " key=" << key.hash.to_string() << " offset=" << 
key.offset
-                         << " old_type=" << 
BlockFileCache::cache_type_to_string(key.meta.type)
-                         << " new_type=" << 
BlockFileCache::cache_type_to_string(type);
+                         << " old_type=" << cache_type_to_string(key.meta.type)
+                         << " new_type=" << cache_type_to_string(type);
         }
         DCHECK(expr);
         std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
@@ -288,7 +288,7 @@ std::string 
FSFileCacheStorage::get_path_in_local_cache(const std::string& dir,
     } else if (type == FileCacheType::TTL) {
         return Path(dir) / std::to_string(offset);
     } else {
-        return Path(dir) / (std::to_string(offset) + 
BlockFileCache::cache_type_to_string(type));
+        return Path(dir) / (std::to_string(offset) + 
cache_type_to_surfix(type));
     }
 }
 
@@ -297,7 +297,7 @@ std::string 
FSFileCacheStorage::get_path_in_local_cache_old_ttl_format(const std
                                                                        
FileCacheType type,
                                                                        bool 
is_tmp) {
     DCHECK(type == FileCacheType::TTL);
-    return Path(dir) / (std::to_string(offset) + 
BlockFileCache::cache_type_to_string(type));
+    return Path(dir) / (std::to_string(offset) + cache_type_to_surfix(type));
 }
 
 std::vector<std::string> 
FSFileCacheStorage::get_path_in_local_cache_all_candidates(
@@ -606,7 +606,7 @@ Status 
FSFileCacheStorage::parse_filename_suffix_to_cache_type(
             if (suffix == "tmp") [[unlikely]] {
                 *is_tmp = true;
             } else {
-                *cache_type = BlockFileCache::string_to_cache_type(suffix);
+                *cache_type = surfix_to_cache_type(suffix);
             }
         }
     } catch (...) {
@@ -662,6 +662,7 @@ void 
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
         auto f = [&](const BatchLoadArgs& args) {
             // in async load mode, a cell may be added twice.
             if (_mgr->_files.contains(args.hash) && 
_mgr->_files[args.hash].contains(args.offset)) {
+                // TODO(zhengyu): update type&expiration if need
                 return;
             }
             // if the file is tmp, it means it is the old file and it should 
be removed
diff --git a/be/src/io/cache/lru_queue_recorder.cpp 
b/be/src/io/cache/lru_queue_recorder.cpp
new file mode 100644
index 00000000000..12da29d42b7
--- /dev/null
+++ b/be/src/io/cache/lru_queue_recorder.cpp
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/lru_queue_recorder.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_cache_common.h"
+
+namespace doris::io {
+
+void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType 
log_type,
+                                          const UInt128Wrapper hash, const 
size_t offset,
+                                          const size_t size) {
+    CacheLRULogQueue& log_queue = get_lru_log_queue(type);
+    log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
+    ++(_lru_queue_update_cnt_from_last_dump[type]);
+}
+
+void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
+    // we don't need the real cache lock for the shadow queue, but we do need 
a lock to prevent read/write contension
+    CacheLRULogQueue& log_queue = get_lru_log_queue(type);
+    LRUQueue& shadow_queue = get_shadow_queue(type);
+
+    std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
+    while (!log_queue.empty()) {
+        auto log = std::move(log_queue.front());
+        log_queue.pop_front();
+        try {
+            switch (log->type) {
+            case CacheLRULogType::ADD: {
+                shadow_queue.add(log->hash, log->offset, log->size, 
lru_log_lock);
+                break;
+            }
+            case CacheLRULogType::REMOVE: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.remove(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "REMOVE failed, doesn't exist in shadow 
queue";
+                }
+                break;
+            }
+            case CacheLRULogType::MOVETOBACK: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.move_to_end(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "MOVETOBACK failed, doesn't exist in 
shadow queue";
+                }
+                break;
+            }
+            default:
+                LOG(WARNING) << "Unknown CacheLRULogType: " << 
static_cast<int>(log->type);
+                break;
+            }
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "Failed to replay queue event: " << e.what();
+        }
+    }
+}
+
+// we evaluate the diff between two queue by calculate how many operation is
+// needed for transfer one to another (Levenshtein Distance)
+// NOTE: HEAVY calculation with cache lock, only for debugging
+void LRUQueueRecorder::evaluate_queue_diff(LRUQueue& base, std::string name,
+                                           std::lock_guard<std::mutex>& 
cache_lock) {
+    FileCacheType type = string_to_cache_type(name);
+    LRUQueue& target = get_shadow_queue(type);
+    size_t distance = target.levenshtein_distance_from(base, cache_lock);
+    *(_mgr->_shadow_queue_levenshtein_distance) << distance;
+    if (distance > 20) {
+        LOG(WARNING) << name << " shadow queue is different from real queue";
+    }
+}
+
+LRUQueue& LRUQueueRecorder::get_shadow_queue(FileCacheType type) {
+    switch (type) {
+    case FileCacheType::INDEX:
+        return _shadow_index_queue;
+    case FileCacheType::DISPOSABLE:
+        return _shadow_disposable_queue;
+    case FileCacheType::NORMAL:
+        return _shadow_normal_queue;
+    case FileCacheType::TTL:
+        return _shadow_ttl_queue;
+    default:
+        LOG(WARNING) << "invalid shadow queue type";
+        DCHECK(false);
+    }
+    return _shadow_normal_queue;
+}
+
+CacheLRULogQueue& LRUQueueRecorder::get_lru_log_queue(FileCacheType type) {
+    switch (type) {
+    case FileCacheType::INDEX:
+        return _index_lru_log_queue;
+    case FileCacheType::DISPOSABLE:
+        return _disposable_lru_log_queue;
+    case FileCacheType::NORMAL:
+        return _normal_lru_log_queue;
+    case FileCacheType::TTL:
+        return _ttl_lru_log_queue;
+    default:
+        LOG(WARNING) << "invalid lru log queue type";
+        DCHECK(false);
+    }
+    return _normal_lru_log_queue;
+}
+
+size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump(FileCacheType 
type) {
+    return _lru_queue_update_cnt_from_last_dump[type];
+}
+
+void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType 
type) {
+    _lru_queue_update_cnt_from_last_dump[type] = 0;
+}
+
+} // end of namespace doris::io
diff --git a/be/src/io/cache/lru_queue_recorder.h 
b/be/src/io/cache/lru_queue_recorder.h
new file mode 100644
index 00000000000..dceef7a493c
--- /dev/null
+++ b/be/src/io/cache/lru_queue_recorder.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/cache/file_cache_common.h"
+
+namespace doris::io {
+
+class LRUQueue;
+
+enum class CacheLRULogType {
+    ADD = 0, // all of the integer types
+    REMOVE = 1,
+    MOVETOBACK = 2,
+    INVALID = 3,
+};
+
+struct CacheLRULog {
+    CacheLRULogType type = CacheLRULogType::INVALID;
+    UInt128Wrapper hash;
+    size_t offset;
+    size_t size;
+
+    CacheLRULog(CacheLRULogType t, UInt128Wrapper h, size_t o, size_t s)
+            : type(t), hash(h), offset(o), size(s) {}
+};
+
+using CacheLRULogQueue = std::list<std::unique_ptr<CacheLRULog>>;
+
+class LRUQueueRecorder {
+public:
+    LRUQueueRecorder(BlockFileCache* mgr) : _mgr(mgr) {
+        _lru_queue_update_cnt_from_last_dump[FileCacheType::DISPOSABLE] = 0;
+        _lru_queue_update_cnt_from_last_dump[FileCacheType::NORMAL] = 0;
+        _lru_queue_update_cnt_from_last_dump[FileCacheType::INDEX] = 0;
+        _lru_queue_update_cnt_from_last_dump[FileCacheType::TTL] = 0;
+    }
+    void record_queue_event(FileCacheType type, CacheLRULogType log_type, 
const UInt128Wrapper hash,
+                            const size_t offset, const size_t size);
+    void replay_queue_event(FileCacheType type);
+    void evaluate_queue_diff(LRUQueue& base, std::string name,
+                             std::lock_guard<std::mutex>& cache_lock);
+    size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type);
+    void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type);
+
+    CacheLRULogQueue& get_lru_log_queue(FileCacheType type);
+    LRUQueue& get_shadow_queue(FileCacheType type);
+
+public:
+    std::mutex _mutex_lru_log;
+
+private:
+    LRUQueue _shadow_index_queue;
+    LRUQueue _shadow_normal_queue;
+    LRUQueue _shadow_disposable_queue;
+    LRUQueue _shadow_ttl_queue;
+
+    CacheLRULogQueue _ttl_lru_log_queue;
+    CacheLRULogQueue _index_lru_log_queue;
+    CacheLRULogQueue _normal_lru_log_queue;
+    CacheLRULogQueue _disposable_lru_log_queue;
+
+    std::unordered_map<FileCacheType, size_t> 
_lru_queue_update_cnt_from_last_dump;
+
+    BlockFileCache* _mgr;
+};
+
+} // namespace doris::io
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index a01bff87bc9..72e96ed3027 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -167,6 +167,8 @@ public:
 
     static bool ready() { return _s_ready.load(std::memory_order_acquire); }
     static bool tracking_memory() { return 
_s_tracking_memory.load(std::memory_order_acquire); }
+    static bool get_is_upgrading() { return 
_s_upgrading.load(std::memory_order_acquire); }
+    static void set_is_upgrading() { _s_upgrading = true; }
     const std::string& token() const;
     ExternalScanContextMgr* external_scan_context_mgr() { return 
_external_scan_context_mgr; }
     vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
@@ -394,6 +396,7 @@ private:
     inline static std::atomic_bool _s_tracking_memory {false};
     std::vector<StorePath> _store_paths;
     std::vector<StorePath> _spill_store_paths;
+    inline static std::atomic_bool _s_upgrading {false};
 
     io::FileCacheFactory* _file_cache_factory = nullptr;
     UserFunctionCache* _user_function_cache = nullptr;
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index fcec0222e5e..fd00f1529aa 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -18,71 +18,13 @@
 // 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
 // and modified by Doris
 
-#include <gen_cpp/Types_types.h>
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-#include <stddef.h>
-
-#include "runtime/thread_context.h"
-#if defined(__APPLE__)
-#include <sys/mount.h>
-#else
-#include <sys/statfs.h>
-#endif
-
-// IWYU pragma: no_include <bits/chrono.h>
-#include <gtest/gtest.h>
-
-#include <chrono> // IWYU pragma: keep
-#include <condition_variable>
-#include <filesystem>
-#include <fstream>
-#include <iostream>
-#include <list>
-#include <memory>
-#include <mutex>
-#include <optional>
-#include <random>
-#include <ranges>
-#include <stdexcept>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include "common/config.h"
-#include "cpp/sync_point.h"
-#include "gtest/gtest_pred_impl.h"
-#include "io/cache/block_file_cache.h"
-#include "io/cache/block_file_cache_factory.h"
-#include "io/cache/block_file_cache_profile.h"
-#include "io/cache/cached_remote_file_reader.h"
-#include "io/cache/file_block.h"
-#include "io/cache/file_cache_common.h"
-#include "io/cache/fs_file_cache_storage.h"
-#include "io/fs/path.h"
-#include "olap/options.h"
-#include "runtime/exec_env.h"
-#include "util/slice.h"
-#include "util/time.h"
-
+#include "block_file_cache_test_common.h"
 namespace doris::io {
 
-extern int disk_used_percentage(const std::string& path, std::pair<int, int>* 
percent);
-
-namespace fs = std::filesystem;
-
 fs::path caches_dir = fs::current_path() / "lru_cache_test";
 std::string cache_base_path = caches_dir / "cache1" / "";
 std::string tmp_file = caches_dir / "tmp_file";
 
-constexpr unsigned long long operator"" _mb(unsigned long long m) {
-    return m * 1024 * 1024;
-}
-
-constexpr unsigned long long operator"" _kb(unsigned long long m) {
-    return m * 1024;
-}
-
 void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr 
file_block,
                   const io::FileBlock::Range& expected_range, 
io::FileBlock::State expected_state) {
     auto range = file_block->range();
@@ -96,7 +38,7 @@ std::vector<io::FileBlockSPtr> fromHolder(const 
io::FileBlocksHolder& holder) {
     return std::vector<io::FileBlockSPtr>(holder.file_blocks.begin(), 
holder.file_blocks.end());
 }
 
-void download(io::FileBlockSPtr file_block, size_t size = 0) {
+void download(io::FileBlockSPtr file_block, size_t size) {
     const auto& hash = file_block->get_hash_value();
     if (size == 0) {
         size = file_block->range().size();
@@ -115,7 +57,7 @@ void download(io::FileBlockSPtr file_block, size_t size = 0) 
{
     ASSERT_TRUE(fs::exists(subdir));
 }
 
-void download_into_memory(io::FileBlockSPtr file_block, size_t size = 0) {
+void download_into_memory(io::FileBlockSPtr file_block, size_t size) {
     if (size == 0) {
         size = file_block->range().size();
     }
@@ -140,101 +82,6 @@ void complete_into_memory(const io::FileBlocksHolder& 
holder) {
     }
 }
 
-class BlockFileCacheTest : public testing::Test {
-public:
-    static void SetUpTestSuite() {
-        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
-        config::enable_evict_file_cache_in_advance = false; // disable evict in
-                                                            // advance for most
-                                                            // cases for simple
-                                                            // verification
-        bool exists {false};
-        ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, 
&exists).ok());
-        if (!exists) {
-            
ASSERT_TRUE(global_local_filesystem()->create_directory(caches_dir).ok());
-        }
-        ASSERT_TRUE(global_local_filesystem()->exists(tmp_file, &exists).ok());
-        if (!exists) {
-            FileWriterPtr writer;
-            ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file, 
&writer).ok());
-            for (int i = 0; i < 10; i++) {
-                std::string data(1_mb, '0' + i);
-                ASSERT_TRUE(writer->append(Slice(data.data(), 
data.size())).ok());
-            }
-            std::string data(1, '0');
-            ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok());
-            ASSERT_TRUE(writer->close().ok());
-        }
-        ExecEnv::GetInstance()->_file_cache_factory = factory.get();
-        ExecEnv::GetInstance()->_file_cache_open_fd_cache = 
std::make_unique<io::FDCache>();
-    }
-    static void TearDownTestSuite() {
-        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
-        ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
-    }
-
-private:
-    inline static std::unique_ptr<FileCacheFactory> factory = 
std::make_unique<FileCacheFactory>();
-};
-
-TEST_F(BlockFileCacheTest, init) {
-    std::string string = std::string(R"(
-        [
-        {
-            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
-            "total_size" : 193273528320,
-            "query_limit" : 38654705664
-        },
-        {
-            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
-            "total_size" : 193273528320,
-            "query_limit" : 38654705664
-        }
-        ]
-        )");
-    config::enable_file_cache_query_limit = true;
-    std::vector<CachePath> cache_paths;
-    EXPECT_TRUE(parse_conf_cache_paths(string, cache_paths));
-    EXPECT_EQ(cache_paths.size(), 2);
-    for (const auto& cache_path : cache_paths) {
-        io::FileCacheSettings settings = cache_path.init_settings();
-        EXPECT_EQ(settings.capacity, 193273528320);
-        EXPECT_EQ(settings.max_query_cache_size, 38654705664);
-    }
-
-    // err normal
-    std::string err_string = std::string(R"(
-        [
-        {
-            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
-            "total_size" : "193273528320",
-            "query_limit" : -1
-        }
-        ]
-        )");
-    cache_paths.clear();
-    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
-
-    // err query_limit
-    err_string = std::string(R"(
-        [
-        {
-            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
-            "total_size" : -1
-        }
-        ]
-        )");
-    cache_paths.clear();
-    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
-
-    err_string = std::string(R"(
-        [
-        ]
-        )");
-    cache_paths.clear();
-    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
-}
-
 void test_file_cache(io::FileCacheType cache_type) {
     TUniqueId query_id;
     query_id.hi = 1;
@@ -1066,6 +913,64 @@ void test_file_cache_memory_storage(io::FileCacheType 
cache_type) {
     }
 }
 
+TEST_F(BlockFileCacheTest, init) {
+    std::string string = std::string(R"(
+        [
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "total_size" : 193273528320,
+            "query_limit" : 38654705664
+        },
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "total_size" : 193273528320,
+            "query_limit" : 38654705664
+        }
+        ]
+        )");
+    config::enable_file_cache_query_limit = true;
+    std::vector<CachePath> cache_paths;
+    EXPECT_TRUE(parse_conf_cache_paths(string, cache_paths));
+    EXPECT_EQ(cache_paths.size(), 2);
+    for (const auto& cache_path : cache_paths) {
+        io::FileCacheSettings settings = cache_path.init_settings();
+        EXPECT_EQ(settings.capacity, 193273528320);
+        EXPECT_EQ(settings.max_query_cache_size, 38654705664);
+    }
+
+    // err normal
+    std::string err_string = std::string(R"(
+        [
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "total_size" : "193273528320",
+            "query_limit" : -1
+        }
+        ]
+        )");
+    cache_paths.clear();
+    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+
+    // err query_limit
+    err_string = std::string(R"(
+        [
+        {
+            "path" : "/mnt/ssd01/clickbench/hot/be/file_cache",
+            "total_size" : -1
+        }
+        ]
+        )");
+    cache_paths.clear();
+    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+
+    err_string = std::string(R"(
+        [
+        ]
+        )");
+    cache_paths.clear();
+    EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths));
+}
+
 TEST_F(BlockFileCacheTest, normal) {
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
@@ -3495,10 +3400,10 @@ TEST_F(BlockFileCacheTest, state_to_string) {
     EXPECT_EQ(FileBlock::state_to_string(FileBlock::State::DOWNLOADED), 
"DOWNLOADED");
 }
 
-TEST_F(BlockFileCacheTest, string_to_cache_type) {
-    EXPECT_EQ(BlockFileCache::string_to_cache_type("idx"), 
FileCacheType::INDEX);
-    EXPECT_EQ(BlockFileCache::string_to_cache_type("disposable"), 
FileCacheType::DISPOSABLE);
-    EXPECT_EQ(BlockFileCache::string_to_cache_type("ttl"), FileCacheType::TTL);
+TEST_F(BlockFileCacheTest, surfix_to_cache_type) {
+    EXPECT_EQ(surfix_to_cache_type("idx"), FileCacheType::INDEX);
+    EXPECT_EQ(surfix_to_cache_type("disposable"), FileCacheType::DISPOSABLE);
+    EXPECT_EQ(surfix_to_cache_type("ttl"), FileCacheType::TTL);
 }
 
 TEST_F(BlockFileCacheTest, append_many_time) {
@@ -5349,7 +5254,7 @@ TEST_F(BlockFileCacheTest, test_load) {
 
     {
         auto type = cache.dump_single_cache_type(key, 10086);
-        ASSERT_TRUE(type == "_ttl");
+        ASSERT_TRUE(type == "ttl");
         auto holder = cache.get_or_set(key, 10086, 3, context);
         auto blocks = fromHolder(holder);
         ASSERT_EQ(blocks.size(), 1);
@@ -5370,7 +5275,7 @@ TEST_F(BlockFileCacheTest, test_load) {
     }
     {
         auto type = cache.dump_single_cache_type(key, 20086);
-        ASSERT_TRUE(type == "_ttl");
+        ASSERT_TRUE(type == "ttl");
         auto holder = cache.get_or_set(key, 20086, 3, context);
         auto blocks = fromHolder(holder);
         ASSERT_EQ(blocks.size(), 1);
@@ -5995,9 +5900,8 @@ TEST_F(BlockFileCacheTest, seize_after_full) {
     };
 
     for (auto& args : args_vec) {
-        std::cout << "filled with " << 
io::BlockFileCache::cache_type_to_string(args.first_type)
-                  << " and seize with "
-                  << 
io::BlockFileCache::cache_type_to_string(args.second_type) << std::endl;
+        std::cout << "filled with " << 
io::cache_type_to_string(args.first_type)
+                  << " and seize with " << 
io::cache_type_to_string(args.second_type) << std::endl;
         if (fs::exists(cache_base_path)) {
             fs::remove_all(cache_base_path);
         }
diff --git a/be/test/io/cache/block_file_cache_test_common.h 
b/be/test/io/cache/block_file_cache_test_common.h
new file mode 100644
index 00000000000..0bf4acf2466
--- /dev/null
+++ b/be/test/io/cache/block_file_cache_test_common.h
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
+// and modified by Doris
+
+#pragma once
+
+#include <gen_cpp/Types_types.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+#include "runtime/thread_context.h"
+#if defined(__APPLE__)
+#include <sys/mount.h>
+#else
+#include <sys/statfs.h>
+#endif
+
+// IWYU pragma: no_include <bits/chrono.h>
+#include <gtest/gtest.h>
+
+#include <chrono> // IWYU pragma: keep
+#include <condition_variable>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <random>
+#include <ranges>
+#include <stdexcept>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "common/config.h"
+#include "cpp/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/block_file_cache_profile.h"
+#include "io/cache/cached_remote_file_reader.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "io/cache/fs_file_cache_storage.h"
+#include "io/fs/path.h"
+#include "olap/options.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+#include "util/time.h"
+
+namespace doris::io {
+namespace fs = std::filesystem;
+
+extern int disk_used_percentage(const std::string& path, std::pair<int, int>* 
percent);
+extern fs::path caches_dir;
+extern std::string cache_base_path;
+extern std::string tmp_file;
+
+constexpr unsigned long long operator"" _mb(unsigned long long m) {
+    return m * 1024 * 1024;
+}
+
+constexpr unsigned long long operator"" _kb(unsigned long long m) {
+    return m * 1024;
+}
+
+extern void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr 
file_block,
+                         const io::FileBlock::Range& expected_range,
+                         io::FileBlock::State expected_state);
+extern std::vector<io::FileBlockSPtr> fromHolder(const io::FileBlocksHolder& 
holder);
+extern void download(io::FileBlockSPtr file_block, size_t size = 0);
+extern void download_into_memory(io::FileBlockSPtr file_block, size_t size = 
0);
+extern void complete(const io::FileBlocksHolder& holder);
+extern void complete_into_memory(const io::FileBlocksHolder& holder);
+extern void test_file_cache(io::FileCacheType cache_type);
+extern void test_file_cache_memory_storage(io::FileCacheType cache_type);
+
+class BlockFileCacheTest : public testing::Test {
+public:
+    static void SetUpTestSuite() {
+        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+        config::enable_evict_file_cache_in_advance = false; // disable evict in
+                                                            // advance for most
+                                                            // cases for simple
+                                                            // verification
+        bool exists {false};
+        ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, 
&exists).ok());
+        if (!exists) {
+            
ASSERT_TRUE(global_local_filesystem()->create_directory(caches_dir).ok());
+        }
+        ASSERT_TRUE(global_local_filesystem()->exists(tmp_file, &exists).ok());
+        if (!exists) {
+            FileWriterPtr writer;
+            ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file, 
&writer).ok());
+            for (int i = 0; i < 10; i++) {
+                std::string data(1_mb, '0' + i);
+                ASSERT_TRUE(writer->append(Slice(data.data(), 
data.size())).ok());
+            }
+            std::string data(1, '0');
+            ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok());
+            ASSERT_TRUE(writer->close().ok());
+        }
+        ExecEnv::GetInstance()->_file_cache_factory = factory.get();
+        ExecEnv::GetInstance()->_file_cache_open_fd_cache = 
std::make_unique<io::FDCache>();
+    }
+    static void TearDownTestSuite() {
+        config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+        ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
+    }
+
+private:
+    inline static std::unique_ptr<FileCacheFactory> factory = 
std::make_unique<FileCacheFactory>();
+};
+
+} // end of namespace doris::io
\ No newline at end of file
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
new file mode 100644
index 00000000000..ea3cb63601e
--- /dev/null
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -0,0 +1,404 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp
+// and modified by Doris
+
+#include "block_file_cache_test_common.h"
+
+namespace doris::io {
+
+TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) {
+    config::enable_evict_file_cache_in_advance = false;
+    config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+    config::file_cache_background_lru_dump_interval_ms = 3000;
+    config::file_cache_background_lru_dump_update_cnt_threshold = 0;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+
+    settings.ttl_queue_size = 5000000;
+    settings.ttl_queue_elements = 50000;
+    settings.query_queue_size = 5000000;
+    settings.query_queue_elements = 50000;
+    settings.index_queue_size = 5000000;
+    settings.index_queue_elements = 50000;
+    settings.disposable_queue_size = 5000000;
+    settings.disposable_queue_elements = 50000;
+    settings.capacity = 20000000;
+    settings.max_file_block_size = 100000;
+    settings.max_query_cache_size = 30;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    int i = 0;
+    for (; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context1;
+    ReadStatistics rstats;
+    context1.stats = &rstats;
+    context1.cache_type = io::FileCacheType::NORMAL;
+    context1.query_id = query_id;
+    auto key1 = io::BlockFileCache::hash("key1");
+
+    int64_t offset = 0;
+
+    for (; offset < 500000; offset += 100000) {
+        auto holder = cache.get_or_set(key1, offset, 100000, context1);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context2;
+    context2.stats = &rstats;
+    context2.cache_type = io::FileCacheType::INDEX;
+    context2.query_id = query_id;
+    auto key2 = io::BlockFileCache::hash("key2");
+
+    offset = 0;
+
+    for (; offset < 500000; offset += 100000) {
+        auto holder = cache.get_or_set(key2, offset, 100000, context2);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    io::CacheContext context3;
+    context3.stats = &rstats;
+    context3.cache_type = io::FileCacheType::TTL;
+    context3.query_id = query_id;
+    context3.expiration_time = UnixSeconds() + 120;
+    auto key3 = io::BlockFileCache::hash("key3");
+
+    offset = 0;
+
+    for (; offset < 500000; offset += 100000) {
+        auto holder = cache.get_or_set(key3, offset, 100000, context3);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+
+    io::CacheContext context4;
+    context4.stats = &rstats;
+    context4.cache_type = io::FileCacheType::DISPOSABLE;
+    context4.query_id = query_id;
+    auto key4 = io::BlockFileCache::hash("key4");
+
+    offset = 0;
+
+    for (; offset < 500000; offset += 100000) {
+        auto holder = cache.get_or_set(key4, offset, 100000, context4);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+
+        assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download(blocks[0]);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+
+        blocks.clear();
+    }
+    ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 500000);
+    ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000);
+
+    // all queue are filled, let's check the lru log records
+    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
+    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 5);
+    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 5);
+    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 5);
+
+    // then check the log replay
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_lru_log_replay_interval_ms));
+    
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
5);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(),
 5);
+
+    // ok, let do some MOVETOBACK & REMOVE
+    {
+        auto holder = cache.get_or_set(key2, 200000, 100000,
+                                       context2); // move index queue 3rd 
element to the end
+        cache.remove_if_cached(key3);             // remove all element from 
ttl queue
+    }
+    ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5);
+    ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 1);
+    ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 0);
+    ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 0);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_lru_log_replay_interval_ms));
+    
ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 0);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 
5);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 
5);
+    
ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(),
 5);
+
+    // check the order
+    std::vector<size_t> offsets;
+    for (auto it = cache._lru_recorder->_shadow_index_queue.begin();
+         it != cache._lru_recorder->_shadow_index_queue.end(); ++it) {
+        offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(offsets.size(), 5);
+    ASSERT_EQ(offsets[0], 0);
+    ASSERT_EQ(offsets[1], 100000);
+    ASSERT_EQ(offsets[2], 300000);
+    ASSERT_EQ(offsets[3], 400000);
+    ASSERT_EQ(offsets[4], 200000);
+
+    std::this_thread::sleep_for(
+            std::chrono::milliseconds(2 * 
config::file_cache_background_lru_dump_interval_ms));
+
+#if 0
+    // Verify all 4 dump files
+    // TODO(zhengyu): abstract those read/write into a function
+    {
+        std::string filename = fmt::format("{}/lru_dump_{}.tail", 
cache_base_path, "ttl");
+
+        struct stat file_stat;
+        EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << 
filename << " not found";
+
+        EXPECT_EQ(file_stat.st_size, 12) << "File " << filename << " has more 
data than footer";
+        std::ifstream in(filename, std::ios::binary);
+        ASSERT_TRUE(in) << "Failed to open " << filename;
+        size_t entry_num = 0;
+        int8_t version = 0;
+        char magic_str[3];
+        char target_str[3] = {'D', 'O', 'R'};
+        in.read(reinterpret_cast<char*>(&entry_num), sizeof(entry_num));
+        in.read(reinterpret_cast<char*>(&version), sizeof(version));
+        in.read(magic_str, sizeof(magic_str));
+        EXPECT_EQ(entry_num, 0);
+        EXPECT_EQ(version, 1);
+        EXPECT_TRUE(memcmp(magic_str, target_str, sizeof(magic_str)) == 0);
+    }
+
+    {
+        std::string filename = fmt::format("{}/lru_dump_{}.tail", 
cache_base_path, "normal");
+
+        struct stat file_stat;
+        EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << 
filename << " not found";
+
+        EXPECT_GT(file_stat.st_size, 12) << "File " << filename << " is empty";
+
+        std::ifstream in(filename, std::ios::binary);
+        ASSERT_TRUE(in) << "Failed to open " << filename;
+        UInt128Wrapper hash;
+        size_t offset, size;
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 0) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 100000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 200000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 300000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 400000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        EXPECT_TRUE(in.fail()) << "still read from " << filename << " which 
should be EOF";
+    }
+
+    {
+        std::string filename = fmt::format("{}/lru_dump_{}.tail", 
cache_base_path, "index");
+
+        struct stat file_stat;
+        EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << 
filename << " not found";
+
+        EXPECT_GT(file_stat.st_size, 12) << "File " << filename << " is empty";
+
+        std::ifstream in(filename, std::ios::binary);
+        ASSERT_TRUE(in) << "Failed to open " << filename;
+        UInt128Wrapper hash;
+        size_t offset, size;
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 0) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 100000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 300000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 400000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        in.read(reinterpret_cast<char*>(&offset), sizeof(offset));
+        in.read(reinterpret_cast<char*>(&size), sizeof(size));
+
+        EXPECT_FALSE(in.fail()) << "Failed to read from " << filename;
+        EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value 
in " << filename;
+        EXPECT_EQ(offset, 200000) << "wrong offset value in " << filename;
+        EXPECT_EQ(size, 100000) << "wrong size value in " << filename;
+
+        in.read(reinterpret_cast<char*>(&hash), sizeof(hash));
+        EXPECT_TRUE(in.fail()) << "still read from " << filename << " which 
should be EOF";
+    }
+#endif
+
+    // dump looks good, let's try restore
+    io::BlockFileCache cache2(cache_base_path, settings);
+    ASSERT_TRUE(cache2.initialize());
+    for (i = 0; i < 100; i++) {
+        if (cache2.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    ASSERT_TRUE(cache2.get_async_open_success());
+
+    // check the size of cache2
+    ASSERT_EQ(cache2._ttl_queue.get_elements_num_unsafe(), 0);
+    ASSERT_EQ(cache2._index_queue.get_elements_num_unsafe(), 5);
+    ASSERT_EQ(cache2._normal_queue.get_elements_num_unsafe(), 5);
+    ASSERT_EQ(cache2._disposable_queue.get_elements_num_unsafe(), 5);
+    ASSERT_EQ(cache2._cur_cache_size, 1500000);
+
+    // then check the order of restored cache2
+    std::vector<size_t> offsets2;
+    for (auto it = cache2._index_queue.begin(); it != 
cache2._index_queue.end(); ++it) {
+        offsets2.push_back(it->offset);
+    }
+    ASSERT_EQ(offsets2.size(), 5);
+    ASSERT_EQ(offsets2[0], 0);
+    ASSERT_EQ(offsets2[1], 100000);
+    ASSERT_EQ(offsets2[2], 300000);
+    ASSERT_EQ(offsets2[3], 400000);
+    ASSERT_EQ(offsets2[4], 200000);
+
+    io::CacheContext context22;
+    context22.stats = &rstats;
+    context22.cache_type = io::FileCacheType::INDEX;
+    context22.query_id = query_id;
+
+    offset = 0;
+
+    for (; offset < 500000; offset += 100000) {
+        auto holder = cache2.get_or_set(key2, offset, 100000, context22);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 
99999),
+                     io::FileBlock::State::DOWNLOADED);
+        blocks.clear();
+    }
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+} // namespace doris::io
diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp 
b/be/test/io/cache/cache_lru_dumper_test.cpp
new file mode 100644
index 00000000000..fd4260b012b
--- /dev/null
+++ b/be/test/io/cache/cache_lru_dumper_test.cpp
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::NiceMock;
+
+namespace doris::io {
+std::mutex _mutex;
+
+class MockBlockFileCache : public BlockFileCache {
+public:
+    LRUQueue* dst_queue; // Pointer to the destination queue
+
+    MockBlockFileCache(LRUQueue* queue) : BlockFileCache("", {}), 
dst_queue(queue) {
+        _cache_base_path = "./";
+    }
+
+    FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& 
ctx, size_t offset,
+                            size_t size, FileBlock::State state,
+                            std::lock_guard<std::mutex>& lock) {
+        dst_queue->add(hash, offset, size, lock);
+        return nullptr;
+    }
+
+    std::mutex& mutex() { return _mutex; }
+
+private:
+    std::mutex _mutex;
+    struct {
+        std::string _cache_base_path;
+    } _mgr;
+};
+
+class CacheLRUDumperTest : public ::testing::Test {
+protected:
+    LRUQueue dst_queue; // Member variable for destination queue
+
+    void SetUp() override {
+        mock_cache = 
std::make_unique<NiceMock<MockBlockFileCache>>(&dst_queue);
+        recorder = std::make_unique<LRUQueueRecorder>(mock_cache.get());
+
+        dumper = std::make_unique<CacheLRUDumper>(mock_cache.get(), 
recorder.get());
+    }
+
+    void TearDown() override {
+        dumper.reset();
+        mock_cache.reset();
+    }
+
+    std::unique_ptr<NiceMock<MockBlockFileCache>> mock_cache;
+    std::unique_ptr<CacheLRUDumper> dumper;
+    std::unique_ptr<LRUQueueRecorder> recorder;
+};
+
+TEST_F(CacheLRUDumperTest, test_finalize_dump_and_parse_dump_footer) {
+    std::ofstream out("test_finalize.bin", std::ios::binary);
+    std::string tmp_filename = "test_finalize.bin.tmp";
+    std::string final_filename = "test_finalize.bin";
+    size_t file_size = 0;
+    size_t entry_num = 10;
+
+    // Test finalize dump
+    EXPECT_TRUE(
+            dumper->finalize_dump(out, entry_num, tmp_filename, 
final_filename, file_size).ok());
+
+    // Test parse footer
+    std::ifstream in("test_finalize.bin", std::ios::binary);
+    size_t parsed_entry_num = 0;
+    EXPECT_TRUE(dumper->parse_dump_footer(in, final_filename, 
parsed_entry_num).ok());
+    EXPECT_EQ(entry_num, parsed_entry_num);
+
+    out.close();
+    in.close();
+    std::remove("test_finalize.bin");
+}
+
+TEST_F(CacheLRUDumperTest, test_remove_lru_dump_files) {
+    // Create test files
+    std::vector<std::string> queue_names = {"disposable", "index", "normal", 
"ttl"};
+    for (const auto& name : queue_names) {
+        std::ofstream(fmt::format("lru_dump_{}.tail", name));
+    }
+
+    // Test remove
+    dumper->remove_lru_dump_files();
+
+    // Verify files are removed
+    for (const auto& name : queue_names) {
+        EXPECT_FALSE(std::filesystem::exists(fmt::format("lru_dump_{}.tail", 
name)));
+    }
+}
+
+TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) {
+    LRUQueue src_queue;
+    std::string queue_name = "normal";
+
+    // Add test data
+    UInt128Wrapper hash(123456789ULL);
+    size_t offset = 1024;
+    size_t size = 4096;
+    std::lock_guard<std::mutex> lock(_mutex);
+    src_queue.add(hash, offset, size, lock);
+
+    // Test dump
+    dumper->do_dump_queue(src_queue, queue_name);
+
+    // Test restore
+    std::lock_guard<std::mutex> cache_lock(mock_cache->mutex());
+    dumper->restore_queue(dst_queue, queue_name, cache_lock);
+
+    // Verify queue content and order
+    auto src_it = src_queue.begin();
+    auto dst_it = dst_queue.begin();
+    while (src_it != src_queue.end() && dst_it != dst_queue.end()) {
+        EXPECT_EQ(src_it->hash, dst_it->hash);
+        EXPECT_EQ(src_it->offset, dst_it->offset);
+        EXPECT_EQ(src_it->size, dst_it->size);
+        ++src_it;
+        ++dst_it;
+    }
+
+    // Clean up
+    std::remove(fmt::format("lru_dump_{}.tail", queue_name).c_str());
+}
+
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/test/io/cache/lru_queue_test.cpp 
b/be/test/io/cache/lru_queue_test.cpp
new file mode 100644
index 00000000000..4a01fb27e3d
--- /dev/null
+++ b/be/test/io/cache/lru_queue_test.cpp
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/Types_types.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+// IWYU pragma: no_include <bits/chrono.h>
+#include <gtest/gtest.h>
+
+#include <list>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <random>
+#include <ranges>
+#include <stdexcept>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "common/config.h"
+#include "cpp/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/cache/block_file_cache.h"
+#include "util/time.h"
+
+using namespace doris::io;
+
+class LRUQueueTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        queue1 = std::make_shared<LRUQueue>();
+        queue2 = std::make_shared<LRUQueue>();
+    }
+
+    std::shared_ptr<LRUQueue> queue1;
+    std::shared_ptr<LRUQueue> queue2;
+};
+
+TEST_F(LRUQueueTest, SameQueueDistance) {
+    std::mutex mutex;
+    std::lock_guard lock(mutex);
+
+    queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(456), 0, 1024, lock);
+
+    EXPECT_EQ(queue1->levenshtein_distance_from(*queue1, lock), 0);
+}
+
+TEST_F(LRUQueueTest, DifferentQueueDistance) {
+    std::mutex mutex;
+    std::lock_guard lock(mutex);
+
+    queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(456), 0, 1024, lock);
+
+    queue2->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue2->add(UInt128Wrapper(789), 0, 1024, lock);
+
+    EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 1);
+}
+
+TEST_F(LRUQueueTest, EmptyQueueDistance) {
+    std::mutex mutex;
+    std::lock_guard lock(mutex);
+
+    queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(456), 0, 1024, lock);
+
+    EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2);
+}
+
+TEST_F(LRUQueueTest, PartialMatchDistance) {
+    std::mutex mutex;
+    std::lock_guard lock(mutex);
+
+    queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(456), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(789), 0, 1024, lock);
+
+    queue2->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue2->add(UInt128Wrapper(101), 0, 1024, lock);
+    queue2->add(UInt128Wrapper(789), 0, 1024, lock);
+
+    EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 1);
+}
+
+TEST_F(LRUQueueTest, SameElementsDifferentOrder) {
+    std::mutex mutex;
+    std::lock_guard lock(mutex);
+
+    queue1->add(UInt128Wrapper(123), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(456), 0, 1024, lock);
+    queue1->add(UInt128Wrapper(789), 0, 1024, lock);
+
+    queue2->add(UInt128Wrapper(789), 0, 1024, lock);
+    queue2->add(UInt128Wrapper(456), 0, 1024, lock);
+    queue2->add(UInt128Wrapper(123), 0, 1024, lock);
+
+    EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2);
+}
diff --git a/be/src/http/action/shrink_mem_action.cpp 
b/gensrc/proto/file_cache.proto
similarity index 51%
copy from be/src/http/action/shrink_mem_action.cpp
copy to gensrc/proto/file_cache.proto
index 66331f4a943..f11375586aa 100644
--- a/be/src/http/action/shrink_mem_action.cpp
+++ b/gensrc/proto/file_cache.proto
@@ -14,28 +14,36 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// Define file format struct, like data header, index header.
 
-#include "http/action/shrink_mem_action.h"
-
-#include <fmt/core.h>
-
-#include "http/http_channel.h"
-#include "http/http_request.h"
-#include "runtime/exec_env.h"
-#include "runtime/memory/memory_reclamation.h"
-#include "util/brpc_client_cache.h"
-#include "util/mem_info.h"
-#include "util/string_util.h"
-
-namespace doris {
-void ShrinkMemAction::handle(HttpRequest* req) {
-    LOG(INFO) << "begin shrink memory";
-    /* this interface might be ready for cloud in the near future
-     * int freed_mem = 0;
-     * doris::MemInfo::process_cache_gc(&freed_mem); */
-    MemoryReclamation::revoke_process_memory("ShrinkMemAction");
-    LOG(INFO) << "shrink memory triggered, using Process GC Free Memory";
-    HttpChannel::send_reply(req, HttpStatus::OK, "shrinking");
+syntax="proto2";
+
+package doris.io.cache;
+
+message UInt128WrapperPb {
+    optional uint64 high = 1;
+    optional uint64 low = 2;
+}
+
+message LRUDumpEntryPb {
+    optional UInt128WrapperPb hash = 1;
+    optional uint64 offset = 2;
+    optional uint64 size = 3;
+}
+
+message LRUDumpEntryGroupPb {
+    repeated LRUDumpEntryPb entries = 1;
+}
+
+message EntryGroupOffsetSizePb {
+    optional uint64 offset = 1;
+    optional uint64 size = 2;
+    optional uint32 checksum = 3;
+}
+
+message LRUDumpMetaPb{
+    optional uint64 entry_num = 1;
+    optional string queue_name = 2;
+    repeated EntryGroupOffsetSizePb group_offset_size = 3;
 }
 
-} // namespace doris
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index 9b948a3c303..3abbed2c399 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -168,6 +168,9 @@ class ServerNode {
         assert false : 'Unknown node type'
     }
 
+    String getBasePath() {
+        return path
+    }
 }
 
 class Frontend extends ServerNode {
diff --git a/regression-test/suites/demo_p0/test_lru_persist.groovy 
b/regression-test/suites/demo_p0/test_lru_persist.groovy
new file mode 100644
index 00000000000..249faadeeda
--- /dev/null
+++ b/regression-test/suites/demo_p0/test_lru_persist.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+// Run docker suite steps:
+// 1. Read 'docker/runtime/doris-compose/Readme.md', make sure you can setup a 
doris docker cluster;
+// 2. update regression-conf-custom.groovy with config:
+//    image = "xxxx"                // your doris docker image
+//    excludeDockerTest = false     // do run docker suite, default is true
+//    dockerEndDeleteFiles = false  // after run docker suite, whether delete 
contains's log and data in directory '/tmp/doris/<suite-name>'
+
+// When run docker suite, then no need an external doris cluster.
+// But whether run a docker suite, need more check.
+// Firstly, get the pipeline's run mode (cloud or not_cloud):
+// If there's an external doris cluster, then fetch pipeline's runMode from it.
+// If there's no external doris cluster, then set pipeline's runMode with 
command args.
+//    for example:  sh run-regression-test.sh --run docker_action  
-runMode=cloud/not_cloud
+// Secondly, compare ClusterOptions.cloudMode and pipeline's runMode
+// If ClusterOptions.cloudMode = null then let ClusterOptions.cloudMode = 
pipeline's cloudMode, and run docker suite.
+// if ClusterOptions.cloudMode = true or false, if cloudMode == pipeline's 
cloudMode or pipeline's cloudMode is unknown,
+//      then run docker suite, otherwise don't run docker suite.
+
+// NOTICE:
+// 1. Need add 'docker' to suite's group, and don't add 'nonConcurrent' to it;
+// 2. In docker closure:
+//    a. Don't use 'Awaitility.await()...until(f)', but use 
'dockerAwaitUntil(..., f)';
+// 3. No need to use code ` if (isCloudMode()) { return } `  in docker suites,
+// instead should use `ClusterOptions.cloudMode = true/false` is enough.
+// Because when run docker suite without an external doris cluster, if suite 
use code `isCloudMode()`, it need specific -runMode=cloud/not_cloud.
+// On the contrary, `ClusterOptions.cloudMode = true/false` no need specific 
-runMode=cloud/not_cloud when no external doris cluster exists.
+
+suite('test_lru_persist', 'docker') {
+    def options = new ClusterOptions()
+    
+    options.feNum = 1
+    options.beNum = 1
+    options.msNum = 1
+    options.cloudMode = true
+    options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2']
+    options.beConfigs += ['enable_file_cache=true', 
'enable_java_support=false', 
'file_cache_enter_disk_resource_limit_mode_percent=99',
+                          'file_cache_background_lru_dump_interval_ms=2000', 
'file_cache_background_lru_log_replay_interval_ms=500',
+                          'disable_auto_compation=true', 
'file_cache_enter_need_evict_cache_in_advance_percent=99',
+                          
'file_cache_background_lru_dump_update_cnt_threshold=0'
+                        ]
+
+    // run another docker
+    docker(options) {
+        cluster.checkFeIsAlive(1, true)
+        cluster.checkBeIsAlive(1, true)
+        sql '''set global enable_auto_analyze=false'''
+
+        sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 
properties ("replication_num"="1")'''
+        sql '''insert into tb1 values (1),(2),(3)'''
+        sql '''insert into tb1 values (4),(5),(6)'''
+        sql '''insert into tb1 values (7),(8),(9)'''
+        sql '''insert into tb1 values (10),(11),(12)'''
+
+        def be = cluster.getBeByIndex(1)
+        def beBasePath = be.getBasePath()
+        def cachePath = beBasePath + "/storage/file_cache/"
+
+        sleep(15000);
+        cluster.stopBackends(1)
+
+        def normalBefore = "md5sum 
${cachePath}/lru_dump_normal.tail".execute().text.trim().split()[0]
+        logger.info("normalBefore: ${normalBefore}")
+
+        cluster.startBackends(1)
+        sleep(10000);
+
+        cluster.stopBackends(1)
+
+        // check md5sum again after be restart
+        def normalAfter = "md5sum 
${cachePath}/lru_dump_normal.tail".execute().text.trim().split()[0]
+        logger.info("normalAfter: ${normalAfter}")
+
+        assert normalBefore == normalAfter
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to