freemandealer commented on code in PR #49456:
URL: https://github.com/apache/doris/pull/49456#discussion_r2174512228


##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    }
+    _dump_meta.Clear();
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+
+    return Status::OK();
+}
+
+void CacheLRUDumper::dump_queue(LRUQueue& queue, const std::string& 
queue_name) {
+    Status st;
+    std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+    elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+    {
+        std::lock_guard<std::mutex> lru_log_lock(_mgr->_mutex_lru_log);
+        size_t count = 0;
+        for (const auto& [hash, offset, size] : queue) {
+            if (count++ >= 
config::file_cache_background_lru_dump_tail_record_num) break;
+            elements.emplace_back(hash, offset, size);
+        }
+    }
+
+    // Write to disk
+    int64_t duration_ns = 0;
+    std::uintmax_t file_size = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        std::string tmp_filename =
+                fmt::format("{}/lru_dump_{}.bin.tmp", _mgr->_cache_base_path, 
queue_name);
+        std::string final_filename =
+                fmt::format("{}/lru_dump_{}.bin", _mgr->_cache_base_path, 
queue_name);
+        std::ofstream out(tmp_filename, std::ios::binary);

Review Comment:
   overwrite not append



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();

Review Comment:
   use Trace log



##########
be/src/io/cache/block_file_cache.cpp:
##########
@@ -2144,6 +2219,132 @@ void BlockFileCache::update_ttl_atime(const 
UInt128Wrapper& hash) {
     };
 }
 
+BlockFileCache::CacheLRULogQueue& 
BlockFileCache::get_lru_log_queue(FileCacheType type) {
+    ++_lru_queue_update_counters[type];
+    switch (type) {
+    case FileCacheType::INDEX:
+        return _index_lru_log_queue;
+    case FileCacheType::DISPOSABLE:
+        return _disposable_lru_log_queue;
+    case FileCacheType::NORMAL:
+        return _normal_lru_log_queue;
+    case FileCacheType::TTL:
+        return _ttl_lru_log_queue;
+    default:
+        DCHECK(false);
+    }
+    return _normal_lru_log_queue;
+}
+
+void BlockFileCache::record_queue_event(CacheLRULogQueue& log_queue, 
CacheLRULogType log_type,
+                                        const UInt128Wrapper hash, const 
size_t offset,
+                                        const size_t size) {
+    log_queue.push_back(std::make_unique<CacheLRULog>(log_type, hash, offset, 
size));
+}
+
+void BlockFileCache::replay_queue_event(CacheLRULogQueue& log_queue, LRUQueue& 
shadow_queue) {
+    // we don't need the real cache lock for the shadow queue, but we do need 
a lock to prevent read/write contension
+    std::lock_guard<std::mutex> lru_log_lock(_mutex_lru_log);
+    while (!log_queue.empty()) {
+        auto log = std::move(log_queue.front());
+        log_queue.pop_front();
+        try {
+            switch (log->type) {
+            case CacheLRULogType::ADD: {
+                shadow_queue.add(log->hash, log->offset, log->size, 
lru_log_lock);
+                break;
+            }
+            case CacheLRULogType::REMOVE: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.remove(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "REMOVE failed, doesn't exist in shadow 
queue";
+                }
+                break;
+            }
+            case CacheLRULogType::MOVETOBACK: {
+                auto it = shadow_queue.get(log->hash, log->offset, 
lru_log_lock);
+                if (it != shadow_queue.end()) {
+                    shadow_queue.move_to_end(it, lru_log_lock);
+                } else {
+                    LOG(WARNING) << "MOVETOBACK failed, doesn't exist in 
shadow queue";
+                }
+                break;
+            }
+            default:
+                LOG(WARNING) << "Unknown CacheLRULogType: " << 
static_cast<int>(log->type);
+                break;
+            }
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "Failed to replay queue event: " << e.what();
+        }
+    }
+}
+
+void BlockFileCache::run_background_lru_log_replay() {
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_lru_log_replay_interval_ms;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        replay_queue_event(_ttl_lru_log_queue, _shadow_ttl_queue);
+        replay_queue_event(_index_lru_log_queue, _shadow_index_queue);
+        replay_queue_event(_normal_lru_log_queue, _shadow_normal_queue);
+        replay_queue_event(_disposable_lru_log_queue, 
_shadow_disposable_queue);
+
+        //TODO(zhengyu): add debug facilities to check diff between real and 
shadow queue
+    }
+}
+
+void BlockFileCache::run_background_lru_dump() {
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_lru_dump_interval_ms;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        if (config::file_cache_background_lru_dump_tail_record_num > 0) {
+            if (_lru_queue_update_counters[FileCacheType::DISPOSABLE] >

Review Comment:
   update_cnt_from_last_dump



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    }
+    _dump_meta.Clear();
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+
+    return Status::OK();
+}
+
+void CacheLRUDumper::dump_queue(LRUQueue& queue, const std::string& 
queue_name) {
+    Status st;
+    std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+    elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+    {
+        std::lock_guard<std::mutex> lru_log_lock(_mgr->_mutex_lru_log);
+        size_t count = 0;
+        for (const auto& [hash, offset, size] : queue) {
+            if (count++ >= 
config::file_cache_background_lru_dump_tail_record_num) break;
+            elements.emplace_back(hash, offset, size);
+        }
+    }
+
+    // Write to disk
+    int64_t duration_ns = 0;
+    std::uintmax_t file_size = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        std::string tmp_filename =
+                fmt::format("{}/lru_dump_{}.bin.tmp", _mgr->_cache_base_path, 
queue_name);
+        std::string final_filename =
+                fmt::format("{}/lru_dump_{}.bin", _mgr->_cache_base_path, 
queue_name);
+        std::ofstream out(tmp_filename, std::ios::binary);
+        if (out) {
+            LOG(INFO) << "begin dump " << queue_name << "with " << 
elements.size() << " elements";
+            for (const auto& [hash, offset, size] : elements) {
+                RETURN_IF_STATUS_ERROR(st,
+                                       dump_one_lru_entry(out, tmp_filename, 
hash, offset, size));
+            }
+            RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), 
tmp_filename,
+                                                     final_filename, 
file_size));
+        } else {
+            LOG(WARNING) << "open lru dump file failed";
+        }
+    }
+    *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000);
+    LOG(INFO) << fmt::format("lru dump for {} size={} time={}us", queue_name, 
file_size,

Review Comment:
   add entry num



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;

Review Comment:
   already have 'file_cache_lru_dump_latency_us'



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();

Review Comment:
   use TRACE log



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {

Review Comment:
   add WARNING log if serialize failed



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    }
+    _dump_meta.Clear();
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+
+    return Status::OK();
+}
+
+void CacheLRUDumper::dump_queue(LRUQueue& queue, const std::string& 
queue_name) {
+    Status st;
+    std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+    elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+    {
+        std::lock_guard<std::mutex> lru_log_lock(_mgr->_mutex_lru_log);
+        size_t count = 0;
+        for (const auto& [hash, offset, size] : queue) {
+            if (count++ >= 
config::file_cache_background_lru_dump_tail_record_num) break;
+            elements.emplace_back(hash, offset, size);
+        }
+    }
+
+    // Write to disk
+    int64_t duration_ns = 0;
+    std::uintmax_t file_size = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        std::string tmp_filename =
+                fmt::format("{}/lru_dump_{}.bin.tmp", _mgr->_cache_base_path, 
queue_name);
+        std::string final_filename =
+                fmt::format("{}/lru_dump_{}.bin", _mgr->_cache_base_path, 
queue_name);
+        std::ofstream out(tmp_filename, std::ios::binary);
+        if (out) {
+            LOG(INFO) << "begin dump " << queue_name << "with " << 
elements.size() << " elements";
+            for (const auto& [hash, offset, size] : elements) {
+                RETURN_IF_STATUS_ERROR(st,
+                                       dump_one_lru_entry(out, tmp_filename, 
hash, offset, size));
+            }
+            RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), 
tmp_filename,
+                                                     final_filename, 
file_size));
+        } else {
+            LOG(WARNING) << "open lru dump file failed";

Review Comment:
   ,reason: tmp_filename create failed



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();

Review Comment:
   no, we do not need a sync. close() will do the work.



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());

Review Comment:
   calculate checksum here



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();

Review Comment:
   and group num



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    }

Review Comment:
   log WARNING if rename failed



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, filename));
+
+    // Record group metadata
+    ::doris::io::cache::EntryGroupOffsetSizePb* group_info = 
_dump_meta.add_group_offset_size();
+    group_info->set_offset(group_start);
+    group_info->set_size(serialized.size());
+
+    // Reset for next group
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+    return Status::OK();
+}
+
+Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
+                                     std::string& tmp_filename, std::string& 
final_filename,
+                                     size_t& file_size) {
+    // Flush any remaining entries
+    if (_current_dump_group_count > 0) {
+        RETURN_IF_ERROR(flush_current_group(out, tmp_filename));
+    }
+
+    // Write meta information
+    _dump_meta.set_entry_num(entry_num);
+    size_t meta_offset = out.tellp();
+    LOG(INFO) << "dump meta: " << _dump_meta.DebugString();
+    std::string meta_serialized;
+    if (!_dump_meta.SerializeToString(&meta_serialized)) {
+        std::string warn_msg =
+                fmt::format("Failed to serialize LRUDumpMetaPb, file={}", 
tmp_filename);
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>("warn_msg");
+    }
+    out.write(meta_serialized.data(), meta_serialized.size());
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    // Write footer
+    Footer footer;
+    footer.meta_offset = htole64(meta_offset); // Explicitly convert to 
little-endian
+    footer.checksum = 0;                       // TODO: Calculate checksum
+    footer.version = 1;
+    std::memcpy(footer.magic, "DOR", 3);
+
+    out.write(reinterpret_cast<const char*>(&footer), sizeof(footer));
+    RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename));
+
+    out.close();
+
+    // Rename tmp to formal file
+    if (std::rename(tmp_filename.c_str(), final_filename.c_str()) != 0) {
+        std::remove(tmp_filename.c_str());
+        file_size = std::filesystem::file_size(final_filename);
+    }
+    _dump_meta.Clear();
+    _current_dump_group.Clear();
+    _current_dump_group_count = 0;
+
+    return Status::OK();
+}
+
+void CacheLRUDumper::dump_queue(LRUQueue& queue, const std::string& 
queue_name) {
+    Status st;
+    std::vector<std::tuple<UInt128Wrapper, size_t, size_t>> elements;
+    elements.reserve(config::file_cache_background_lru_dump_tail_record_num);
+
+    {
+        std::lock_guard<std::mutex> lru_log_lock(_mgr->_mutex_lru_log);
+        size_t count = 0;
+        for (const auto& [hash, offset, size] : queue) {
+            if (count++ >= 
config::file_cache_background_lru_dump_tail_record_num) break;
+            elements.emplace_back(hash, offset, size);
+        }
+    }
+
+    // Write to disk
+    int64_t duration_ns = 0;
+    std::uintmax_t file_size = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        std::string tmp_filename =
+                fmt::format("{}/lru_dump_{}.bin.tmp", _mgr->_cache_base_path, 
queue_name);
+        std::string final_filename =
+                fmt::format("{}/lru_dump_{}.bin", _mgr->_cache_base_path, 
queue_name);
+        std::ofstream out(tmp_filename, std::ios::binary);
+        if (out) {
+            LOG(INFO) << "begin dump " << queue_name << "with " << 
elements.size() << " elements";
+            for (const auto& [hash, offset, size] : elements) {
+                RETURN_IF_STATUS_ERROR(st,
+                                       dump_one_lru_entry(out, tmp_filename, 
hash, offset, size));
+            }
+            RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), 
tmp_filename,
+                                                     final_filename, 
file_size));
+        } else {
+            LOG(WARNING) << "open lru dump file failed";
+        }
+    }
+    *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000);
+    LOG(INFO) << fmt::format("lru dump for {} size={} time={}us", queue_name, 
file_size,
+                             duration_ns / 1000);
+};
+
+Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& 
filename,
+                                         size_t& entry_num) {
+    size_t file_size = std::filesystem::file_size(filename);
+
+    // Read footer
+    Footer footer;
+    size_t footer_size = sizeof(footer);
+    if (file_size < footer_size) {
+        std::string warn_msg = std::string(fmt::format(
+                "LRU dump file too small to contain footer, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    in.seekg(-footer_size, std::ios::end);
+    in.read(reinterpret_cast<char*>(&footer), footer_size);
+    RETURN_IF_ERROR(check_ifstream_status(in, filename));
+
+    // Convert from little-endian to host byte order
+    footer.meta_offset = le64toh(footer.meta_offset);
+
+    // Validate footer
+    if (footer.version != 1 || std::string(footer.magic, 3) != "DOR") {
+        std::string warn_msg = std::string(fmt::format(
+                "LRU dump file invalid footer format, file={}, skip restore", 
filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    // Read meta
+    in.seekg(footer.meta_offset, std::ios::beg);
+    size_t meta_size = file_size - footer.meta_offset - footer_size;
+    if (meta_size <= 0) {
+        std::string warn_msg = std::string(
+                fmt::format("LRU dump file invalid meta size, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+    std::string meta_serialized(meta_size, '\0');
+    in.read(&meta_serialized[0], meta_serialized.size());
+    RETURN_IF_ERROR(check_ifstream_status(in, filename));
+    _parse_meta.Clear();
+    _current_parse_group.Clear();
+    if (!_parse_meta.ParseFromString(meta_serialized)) {
+        std::string warn_msg = std::string(
+                fmt::format("LRU dump file meta parse failed, file={}, skip 
restore", filename));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+    LOG(INFO) << "parse meta: " << _parse_meta.DebugString();
+
+    entry_num = _parse_meta.entry_num();
+    return Status::OK();
+}
+
+Status CacheLRUDumper::parse_one_lru_entry(std::ifstream& in, std::string& 
filename,
+                                           UInt128Wrapper& hash, size_t& 
offset, size_t& size) {
+    // Read next group if current is empty
+    if (_current_parse_group.entries_size() == 0) {
+        if (_parse_meta.group_offset_size_size() == 0) {
+            return Status::EndOfFile("No more entries");
+        }
+
+        auto group_info = _parse_meta.group_offset_size(0);
+        in.seekg(group_info.offset(), std::ios::beg);
+        std::string group_serialized(group_info.size(), '\0');
+        in.read(&group_serialized[0], group_serialized.size());
+        RETURN_IF_ERROR(check_ifstream_status(in, filename));
+
+        LOG(INFO) << "Deserializing group of size: " << 
group_serialized.size();
+        if (!_current_parse_group.ParseFromString(group_serialized)) {
+            std::string warn_msg =
+                    fmt::format("restore lru failed to parse group, file={}", 
filename);
+            LOG(WARNING) << warn_msg;
+            return Status::InternalError(warn_msg);
+        }
+
+        // Remove processed group info
+        
_parse_meta.mutable_group_offset_size()->erase(_parse_meta.group_offset_size().begin());
+    }
+
+    // Get next entry from current group
+    LOG(INFO) << "After deserialization: " << 
_current_parse_group.DebugString();
+    auto entry = _current_parse_group.entries(0);
+    hash = UInt128Wrapper((static_cast<uint128_t>(entry.hash().high()) << 64) 
| entry.hash().low());
+    offset = entry.offset();
+    size = entry.size();
+
+    // Remove processed entry
+    
_current_parse_group.mutable_entries()->erase(_current_parse_group.entries().begin());
+    return Status::OK();
+}
+
+void CacheLRUDumper::restore_queue(LRUQueue& queue, const std::string& 
queue_name,
+                                   std::lock_guard<std::mutex>& cache_lock) {
+    Status st;
+    std::string filename = fmt::format("{}/lru_dump_{}.bin", 
_mgr->_cache_base_path, queue_name);
+    std::ifstream in(filename, std::ios::binary);
+    int64_t duration_ns = 0;
+    if (in) {
+        LOG(INFO) << "lru dump file is founded for " << queue_name << ". 
starting lru restore.";
+
+        SCOPED_RAW_TIMER(&duration_ns);
+        size_t entry_num = 0;
+        RETURN_IF_STATUS_ERROR(st, parse_dump_footer(in, filename, entry_num));
+        in.seekg(0, std::ios::beg);
+        UInt128Wrapper hash;
+        size_t offset, size;
+        for (int i = 0; i < entry_num; ++i) {
+            RETURN_IF_STATUS_ERROR(st, parse_one_lru_entry(in, filename, hash, 
offset, size));
+            CacheContext ctx;
+            if (queue_name == "ttl") {
+                ctx.cache_type = FileCacheType::TTL;
+                // TODO(zhengyu): we haven't persist expiration time yet, use 
3h default
+                // TODO(zhengyu): we don't use stats yet, see if this will 
cause any problem
+                ctx.expiration_time = 10800;
+            } else if (queue_name == "index") {
+                ctx.cache_type = FileCacheType::INDEX;
+            } else if (queue_name == "normal") {
+                ctx.cache_type = FileCacheType::NORMAL;
+            } else if (queue_name == "disposable") {
+                ctx.cache_type = FileCacheType::DISPOSABLE;
+            } else {
+                LOG_WARNING("unknown queue type");
+                DCHECK(false);
+                return;
+            }
+            _mgr->add_cell(hash, ctx, offset, size, 
FileBlock::State::DOWNLOADED, cache_lock);
+        }
+        in.close();
+    } else {
+        LOG(INFO) << "no lru dump file is founded for " << queue_name;
+    }
+    LOG(INFO) << "lru restore time costs: " << (duration_ns / 1000 / 1000) << 
"ms.";

Review Comment:
   use us instead to keep same with lru dump



##########
be/src/io/cache/cache_lru_dumper.cpp:
##########
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/cache_lru_dumper.h"
+
+#include "io/cache/block_file_cache.h"
+#include "io/cache/cache_lru_dumper.h"
+
+namespace doris::io {
+Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& 
filename) {
+    if (!out.good()) {
+        std::ios::iostate state = out.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        out.close();
+        std::string warn_msg = fmt::format("dump lru writing failed, file={}, 
{}", filename,
+                                           err_msg.str().c_str());
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& 
filename) {
+    if (!in.good()) {
+        std::ios::iostate state = in.rdstate();
+        std::stringstream err_msg;
+        if (state & std::ios::eofbit) {
+            err_msg << "End of file reached.";
+        }
+        if (state & std::ios::failbit) {
+            err_msg << "Input/output operation failed, err_code: " << 
strerror(errno);
+        }
+        if (state & std::ios::badbit) {
+            err_msg << "Serious I/O error occurred, err_code: " << 
strerror(errno);
+        }
+        in.close();
+        std::string warn_msg = std::string(
+                fmt::format("dump lru reading failed, file={}, {}", filename, 
err_msg.str()));
+        LOG(WARNING) << warn_msg;
+        return Status::InternalError<false>(warn_msg);
+    }
+
+    return Status::OK();
+}
+
+Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& 
filename,
+                                          const UInt128Wrapper& hash, size_t 
offset, size_t size) {
+    // Dump file format description:
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_1                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_2                         |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_3                         |
+    // +-----------------------------------------------+
+    // | ...                                           |
+    // +-----------------------------------------------+
+    // | LRUDumpEntryGroupPb_n                         |
+    // +-----------------------------------------------+
+    // | LRUDumpMetaPb (queueName, Group<off,size>List)|
+    // +-----------------------------------------------+
+    // | FOOTER_OFFSET (8Bytes)                        |
+    // +-----------------------------------------------+
+    // | CHECKSUM (4Bytes)|VERSION (1Byte)| MAGIC     |
+    // +-----------------------------------------------+
+    //
+    // why we are not using protobuf as a whole?
+    // AFAIK, current protobuf version dose not support streaming mode,
+    // so that we need to store all the message in memory which will
+    // consume loads of RAMs.
+    // Instead, we use protobuf serialize each of the single entry
+    // and provide the version field in the footer for upgrade
+
+    ::doris::io::cache::LRUDumpEntryPb* entry = 
_current_dump_group.add_entries();
+    ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash();
+    hash_pb->set_high(hash.high());
+    hash_pb->set_low(hash.low());
+    entry->set_offset(offset);
+    entry->set_size(size);
+
+    _current_dump_group_count++;
+    if (_current_dump_group_count >= 10000) {
+        RETURN_IF_ERROR(flush_current_group(out, filename));
+    }
+    return Status::OK();
+}
+
+Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& 
filename) {
+    if (_current_dump_group_count == 0) {
+        return Status::OK();
+    }
+
+    // Record current position as group start offset
+    size_t group_start = out.tellp();
+
+    // Serialize and write the group
+    std::string serialized;
+    LOG(INFO) << "Before serialization: " << _current_dump_group.DebugString();
+    if (!_current_dump_group.SerializeToString(&serialized)) {
+        return Status::InternalError<false>("Failed to serialize 
LRUDumpEntryGroupPb");
+    }
+    LOG(INFO) << "Serialized size: " << serialized.size();
+    out.write(serialized.data(), serialized.size());

Review Comment:
   dump performance is not critical; instead, IO number is the key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to