github-actions[bot] commented on code in PR #39452:
URL: https://github.com/apache/doris/pull/39452#discussion_r1718698155


##########
be/src/io/cache/file_block.h:
##########
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <fmt/format.h>

Review Comment:
   warning: 'fmt/format.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <fmt/format.h>
            ^
   ```
   



##########
be/src/io/cache/block_file_cache.h:
##########
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bvar/bvar.h>

Review Comment:
   warning: 'bvar/bvar.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <bvar/bvar.h>
            ^
   ```
   



##########
be/src/olap/options.h:
##########
@@ -18,14 +18,14 @@
 #pragma once
 
 #include <gen_cpp/Types_types.h>

Review Comment:
   warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/Types_types.h>
            ^
   ```
   



##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -0,0 +1,583 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/fs_file_cache_storage.h"
+
+#include <filesystem>
+#include <mutex>
+#include <system_error>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer.h"
+#include "runtime/exec_env.h"
+#include "vec/common/hex.h"
+
+namespace doris::io {
+
+struct BatchLoadArgs {
+    UInt128Wrapper hash;
+    CacheContext ctx;
+    uint64_t offset;
+    size_t size;
+    std::string key_path;
+    std::string offset_path;
+    bool is_tmp;
+};
+
+FDCache* FDCache::instance() {
+    return ExecEnv::GetInstance()->file_cache_open_fd_cache();
+}
+
+std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& 
key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return nullptr;
+    }
+    std::shared_lock rlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        return iter->second->second;
+    }
+    return nullptr;
+}
+
+void FDCache::insert_file_reader(const AccessKeyAndOffset& key,
+                                 std::shared_ptr<FileReader> file_reader) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+
+    if (auto iter = _file_name_to_reader.find(key); iter == 
_file_name_to_reader.end()) {
+        if (config::file_cache_max_file_reader_cache_size == 
_file_reader_list.size()) {
+            _file_name_to_reader.erase(_file_reader_list.back().first);
+            _file_reader_list.pop_back();
+        }
+        _file_reader_list.emplace_front(key, std::move(file_reader));
+        _file_name_to_reader.insert(std::make_pair(key, 
_file_reader_list.begin()));
+    }
+}
+
+void FDCache::remove_file_reader(const AccessKeyAndOffset& key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        _file_reader_list.erase(iter->second);
+        _file_name_to_reader.erase(key);
+    }
+}
+
+bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) {
+    std::shared_lock rlock(_mtx);
+    return _file_name_to_reader.contains(key);
+}
+
+size_t FDCache::file_reader_cache_size() {
+    std::shared_lock rlock(_mtx);
+    return _file_reader_list.size();
+}
+
+Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+    _cache_base_path = _mgr->_cache_base_path;
+    RETURN_IF_ERROR(rebuild_data_structure());
+    _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+        load_cache_info_into_memory(mgr);
+        mgr->_lazy_open_done = true;
+        LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+    });
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) 
{
+    FileWriter* writer = nullptr;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        if (auto iter = _key_to_writer.find(file_writer_map_key); iter != 
_key_to_writer.end()) {
+            writer = iter->second.get();
+        } else {
+            std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+            auto st = fs->create_directory(dir, false);
+            if (!st.ok() && !st.is<ErrorCode::ALREADY_EXIST>()) {
+                return st;
+            }
+            std::string tmp_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type, true);
+            FileWriterPtr file_writer;
+            FileWriterOptions opts {.sync_file_data = false};
+            RETURN_IF_ERROR(fs->create_file(tmp_file, &file_writer, &opts));
+            writer = file_writer.get();
+            _key_to_writer.emplace(file_writer_map_key, 
std::move(file_writer));
+        }
+    }
+    DCHECK_NE(writer, nullptr);
+    return writer->append(value);
+}
+
+Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
+    FileWriterPtr file_writer;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        auto iter = _key_to_writer.find(file_writer_map_key);
+        DCHECK(iter != _key_to_writer.end());
+        file_writer = std::move(iter->second);
+        _key_to_writer.erase(iter);
+    }
+    if (!file_writer->is_closed()) {
+        RETURN_IF_ERROR(file_writer->close());
+    }
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string true_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+    return fs->rename(file_writer->path(), true_file);
+}
+
+Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, 
Slice buffer) {
+    AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
+    FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
+    if (!file_reader) {
+        std::string file =
+                get_path_in_local_cache(get_path_in_local_cache(key.hash, 
key.meta.expiration_time),
+                                        key.offset, key.meta.type);
+        RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+        FDCache::instance()->insert_file_reader(fd_key, file_reader);
+    }
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
+    DCHECK(bytes_read == buffer.get_size());
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type);
+    FDCache::instance()->remove_file_reader(std::make_pair(key.hash, 
key.offset));
+    RETURN_IF_ERROR(fs->delete_file(file));
+    std::vector<FileInfo> files;
+    bool exists {false};
+    RETURN_IF_ERROR(fs->list(dir, true, &files, &exists));
+    DCHECK(exists);
+    if (files.empty()) {
+        RETURN_IF_ERROR(fs->delete_directory(dir));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const 
KeyMeta& new_meta) {
+    // TTL change
+    if (key.meta.expiration_time != new_meta.expiration_time) {
+        std::string original_dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string new_dir = get_path_in_local_cache(key.hash, 
new_meta.expiration_time);
+        // It will be concurrent, but we don't care who rename
+        Status st = fs->rename(original_dir, new_dir);
+        if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+            return st;
+        }
+    } else if (key.meta.type != new_meta.type) {
+        std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string original_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+        std::string new_file = get_path_in_local_cache(dir, key.offset, 
new_meta.type);
+        RETURN_IF_ERROR(fs->rename(original_file, new_file));
+    }
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& 
dir, size_t offset,
+                                                        FileCacheType type, 
bool is_tmp) {
+    return Path(dir) / (std::to_string(offset) +
+                        (is_tmp ? "_tmp" : 
BlockFileCache::cache_type_to_string(type)));
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& 
value,
+                                                        uint64_t 
expiration_time) const {
+    auto str = value.to_string();
+    try {
+        if constexpr (USE_CACHE_VERSION2) {
+            return Path(_cache_base_path) / str.substr(0, KEY_PREFIX_LENGTH) /
+                   (str + "_" + std::to_string(expiration_time));
+        } else {
+            return Path(_cache_base_path) / (str + "_" + 
std::to_string(expiration_time));
+        }
+    } catch (std::filesystem::filesystem_error& e) {
+        LOG_WARNING("fail to get_path_in_local_cache")
+                .tag("err", e.what())
+                .tag("key", value.to_string())
+                .tag("expiration_time", expiration_time);
+        return "";
+    }
+}
+
+Status FSFileCacheStorage::rebuild_data_structure() const {
+    /// version 1.0: cache_base_path / key / offset
+    /// version 2.0: cache_base_path / key_prefix / key / offset
+    std::string version;
+    RETURN_IF_ERROR(read_file_cache_version(&version));
+    if (USE_CACHE_VERSION2 && version != "2.0") {
+        // move directories format as version 2.0
+        std::error_code ec;
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) {
+            return Status::InternalError("Failed to list dir {}: {}", 
_cache_base_path,
+                                         ec.message());
+        }
+        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+            if (key_it->is_directory()) {
+                std::string cache_key = key_it->path().filename().native();
+                if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                    std::string key_prefix =
+                            Path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
+                    bool exists = false;
+                    RETURN_IF_ERROR(fs->exists(key_prefix, &exists));
+                    if (!exists) {
+                        RETURN_IF_ERROR(fs->create_directory(key_prefix));
+                    }
+                    RETURN_IF_ERROR(fs->rename(key_it->path(), key_prefix / 
cache_key));
+                }
+            }
+        }
+        if (!write_file_cache_version().ok()) {
+            return Status::InternalError("Failed to write version hints for 
file cache");
+        }
+    }
+
+    auto rebuild_dir = [&](std::filesystem::directory_iterator& 
upgrade_key_it) -> Status {
+        for (; upgrade_key_it != std::filesystem::directory_iterator(); 
++upgrade_key_it) {
+            if (upgrade_key_it->path().filename().native().find('_') == 
std::string::npos) {
+                
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
+                RETURN_IF_ERROR(
+                        fs->rename(upgrade_key_it->path(), 
upgrade_key_it->path().native() + "_0"));
+            }
+        }
+        return Status::OK();
+    };
+    std::error_code ec;
+    if constexpr (USE_CACHE_VERSION2) {
+        std::filesystem::directory_iterator key_prefix_it {_cache_base_path, 
ec};
+        if (ec) [[unlikely]] {
+            LOG(WARNING) << ec.message();
+            return Status::IOError(ec.message());
+        }
+        for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
+            if (!key_prefix_it->is_directory()) {
+                // maybe version hits file
+                continue;
+            }
+            if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
+                LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
+                             << ", try to remove it";
+                RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
+            }
+            std::filesystem::directory_iterator key_it {key_prefix_it->path(), 
ec};
+            if (ec) [[unlikely]] {
+                return Status::IOError(ec.message());
+            }
+            RETURN_IF_ERROR(rebuild_dir(key_it));
+        }
+    } else {
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) [[unlikely]] {
+            return Status::IOError(ec.message());
+        }
+        RETURN_IF_ERROR(rebuild_dir(key_it));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::write_file_cache_version() const {
+    if constexpr (USE_CACHE_VERSION2) {
+        std::string version_path = get_version_path();
+        Slice version("2.0");
+        FileWriterPtr version_writer;
+        RETURN_IF_ERROR(fs->create_file(version_path, &version_writer));
+        RETURN_IF_ERROR(version_writer->append(version));
+        return version_writer->close();
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::read_file_cache_version(std::string* buffer) const {
+    std::string version_path = get_version_path();
+    bool exists = false;
+    RETURN_IF_ERROR(fs->exists(version_path, &exists));
+    if (!exists) {
+        *buffer = "1.0";
+        return Status::OK();
+    }
+    FileReaderSPtr version_reader;
+    int64_t file_size = -1;
+    RETURN_IF_ERROR(fs->file_size(version_path, &file_size));
+    buffer->resize(file_size);
+    RETURN_IF_ERROR(fs->open_file(version_path, &version_reader));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(version_reader->read_at(0, Slice(buffer->data(), 
file_size), &bytes_read));
+    RETURN_IF_ERROR(version_reader->close());
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_version_path() const {
+    return Path(_cache_base_path) / "version";
+}
+
+void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {

Review Comment:
   warning: function 'load_cache_info_into_memory' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/io/cache/fs_file_cache_storage.cpp:338:** 152 lines including 
whitespace and comments (threshold 80)
   ```cpp
   void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {
                            ^
   ```
   
   </details>
   



##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -0,0 +1,583 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/fs_file_cache_storage.h"
+
+#include <filesystem>
+#include <mutex>
+#include <system_error>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer.h"
+#include "runtime/exec_env.h"
+#include "vec/common/hex.h"
+
+namespace doris::io {
+
+struct BatchLoadArgs {
+    UInt128Wrapper hash;
+    CacheContext ctx;
+    uint64_t offset;
+    size_t size;
+    std::string key_path;
+    std::string offset_path;
+    bool is_tmp;
+};
+
+FDCache* FDCache::instance() {
+    return ExecEnv::GetInstance()->file_cache_open_fd_cache();
+}
+
+std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& 
key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return nullptr;
+    }
+    std::shared_lock rlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        return iter->second->second;
+    }
+    return nullptr;
+}
+
+void FDCache::insert_file_reader(const AccessKeyAndOffset& key,
+                                 std::shared_ptr<FileReader> file_reader) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+
+    if (auto iter = _file_name_to_reader.find(key); iter == 
_file_name_to_reader.end()) {
+        if (config::file_cache_max_file_reader_cache_size == 
_file_reader_list.size()) {
+            _file_name_to_reader.erase(_file_reader_list.back().first);
+            _file_reader_list.pop_back();
+        }
+        _file_reader_list.emplace_front(key, std::move(file_reader));
+        _file_name_to_reader.insert(std::make_pair(key, 
_file_reader_list.begin()));
+    }
+}
+
+void FDCache::remove_file_reader(const AccessKeyAndOffset& key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        _file_reader_list.erase(iter->second);
+        _file_name_to_reader.erase(key);
+    }
+}
+
+bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) {
+    std::shared_lock rlock(_mtx);
+    return _file_name_to_reader.contains(key);
+}
+
+size_t FDCache::file_reader_cache_size() {
+    std::shared_lock rlock(_mtx);
+    return _file_reader_list.size();
+}
+
+Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+    _cache_base_path = _mgr->_cache_base_path;
+    RETURN_IF_ERROR(rebuild_data_structure());
+    _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+        load_cache_info_into_memory(mgr);
+        mgr->_lazy_open_done = true;
+        LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+    });
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) 
{
+    FileWriter* writer = nullptr;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        if (auto iter = _key_to_writer.find(file_writer_map_key); iter != 
_key_to_writer.end()) {
+            writer = iter->second.get();
+        } else {
+            std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+            auto st = fs->create_directory(dir, false);
+            if (!st.ok() && !st.is<ErrorCode::ALREADY_EXIST>()) {
+                return st;
+            }
+            std::string tmp_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type, true);
+            FileWriterPtr file_writer;
+            FileWriterOptions opts {.sync_file_data = false};
+            RETURN_IF_ERROR(fs->create_file(tmp_file, &file_writer, &opts));
+            writer = file_writer.get();
+            _key_to_writer.emplace(file_writer_map_key, 
std::move(file_writer));
+        }
+    }
+    DCHECK_NE(writer, nullptr);
+    return writer->append(value);
+}
+
+Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
+    FileWriterPtr file_writer;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        auto iter = _key_to_writer.find(file_writer_map_key);
+        DCHECK(iter != _key_to_writer.end());
+        file_writer = std::move(iter->second);
+        _key_to_writer.erase(iter);
+    }
+    if (!file_writer->is_closed()) {
+        RETURN_IF_ERROR(file_writer->close());
+    }
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string true_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+    return fs->rename(file_writer->path(), true_file);
+}
+
+Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, 
Slice buffer) {
+    AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
+    FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
+    if (!file_reader) {
+        std::string file =
+                get_path_in_local_cache(get_path_in_local_cache(key.hash, 
key.meta.expiration_time),
+                                        key.offset, key.meta.type);
+        RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+        FDCache::instance()->insert_file_reader(fd_key, file_reader);
+    }
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
+    DCHECK(bytes_read == buffer.get_size());
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type);
+    FDCache::instance()->remove_file_reader(std::make_pair(key.hash, 
key.offset));
+    RETURN_IF_ERROR(fs->delete_file(file));
+    std::vector<FileInfo> files;
+    bool exists {false};
+    RETURN_IF_ERROR(fs->list(dir, true, &files, &exists));
+    DCHECK(exists);
+    if (files.empty()) {
+        RETURN_IF_ERROR(fs->delete_directory(dir));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const 
KeyMeta& new_meta) {
+    // TTL change
+    if (key.meta.expiration_time != new_meta.expiration_time) {
+        std::string original_dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string new_dir = get_path_in_local_cache(key.hash, 
new_meta.expiration_time);
+        // It will be concurrent, but we don't care who rename
+        Status st = fs->rename(original_dir, new_dir);
+        if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+            return st;
+        }
+    } else if (key.meta.type != new_meta.type) {
+        std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string original_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+        std::string new_file = get_path_in_local_cache(dir, key.offset, 
new_meta.type);
+        RETURN_IF_ERROR(fs->rename(original_file, new_file));
+    }
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& 
dir, size_t offset,
+                                                        FileCacheType type, 
bool is_tmp) {
+    return Path(dir) / (std::to_string(offset) +
+                        (is_tmp ? "_tmp" : 
BlockFileCache::cache_type_to_string(type)));
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& 
value,
+                                                        uint64_t 
expiration_time) const {
+    auto str = value.to_string();
+    try {
+        if constexpr (USE_CACHE_VERSION2) {
+            return Path(_cache_base_path) / str.substr(0, KEY_PREFIX_LENGTH) /
+                   (str + "_" + std::to_string(expiration_time));
+        } else {
+            return Path(_cache_base_path) / (str + "_" + 
std::to_string(expiration_time));
+        }
+    } catch (std::filesystem::filesystem_error& e) {
+        LOG_WARNING("fail to get_path_in_local_cache")
+                .tag("err", e.what())
+                .tag("key", value.to_string())
+                .tag("expiration_time", expiration_time);
+        return "";
+    }
+}
+
+Status FSFileCacheStorage::rebuild_data_structure() const {

Review Comment:
   warning: function 'rebuild_data_structure' has cognitive complexity of 119 
(threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status FSFileCacheStorage::rebuild_data_structure() const {
                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/io/cache/fs_file_cache_storage.cpp:231:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(read_file_cache_version(&version));
       ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:231:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(read_file_cache_version(&version));
       ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:232:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if (USE_CACHE_VERSION2 && version != "2.0") {
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:232:** +1
   ```cpp
       if (USE_CACHE_VERSION2 && version != "2.0") {
                              ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:236:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (ec) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:240:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:241:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (key_it->is_directory()) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:243:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (cache_key.size() > KEY_PREFIX_LENGTH) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:247:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(fs->exists(key_prefix, &exists));
                       ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:247:** +6, including nesting 
penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(fs->exists(key_prefix, &exists));
                       ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:248:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       if (!exists) {
                       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:249:** +6, including nesting 
penalty of 5, nesting level increased to 6
   ```cpp
                           RETURN_IF_ERROR(fs->create_directory(key_prefix));
                           ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:249:** +7, including nesting 
penalty of 6, nesting level increased to 7
   ```cpp
                           RETURN_IF_ERROR(fs->create_directory(key_prefix));
                           ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:251:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(fs->rename(key_it->path(), key_prefix / 
cache_key));
                       ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:251:** +6, including nesting 
penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(fs->rename(key_it->path(), key_prefix / 
cache_key));
                       ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:255:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (!write_file_cache_version().ok()) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:260:** nesting level increased 
to 1
   ```cpp
       auto rebuild_dir = [&](std::filesystem::directory_iterator& 
upgrade_key_it) -> Status {
                          ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:261:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           for (; upgrade_key_it != std::filesystem::directory_iterator(); 
++upgrade_key_it) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:262:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (upgrade_key_it->path().filename().native().find('_') == 
std::string::npos) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:263:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
                   ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:263:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                   
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
                   ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:264:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:264:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(
                   ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:271:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if constexpr (USE_CACHE_VERSION2) {
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:273:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (ec) [[unlikely]] {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:277:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:278:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (!key_prefix_it->is_directory()) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:282:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:285:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
                   ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:285:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
                   ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:288:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (ec) [[unlikely]] {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:291:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(rebuild_dir(key_it));
               ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:291:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(rebuild_dir(key_it));
               ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:293:** +1, nesting level 
increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:295:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (ec) [[unlikely]] {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:298:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(rebuild_dir(key_it));
           ^
   ```
   **be/src/common/status.h:609:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:298:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(rebuild_dir(key_it));
           ^
   ```
   **be/src/common/status.h:611:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -0,0 +1,583 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/fs_file_cache_storage.h"
+
+#include <filesystem>
+#include <mutex>
+#include <system_error>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer.h"
+#include "runtime/exec_env.h"
+#include "vec/common/hex.h"
+
+namespace doris::io {
+
+struct BatchLoadArgs {
+    UInt128Wrapper hash;
+    CacheContext ctx;
+    uint64_t offset;
+    size_t size;
+    std::string key_path;
+    std::string offset_path;
+    bool is_tmp;
+};
+
+FDCache* FDCache::instance() {
+    return ExecEnv::GetInstance()->file_cache_open_fd_cache();
+}
+
+std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& 
key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return nullptr;
+    }
+    std::shared_lock rlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        return iter->second->second;
+    }
+    return nullptr;
+}
+
+void FDCache::insert_file_reader(const AccessKeyAndOffset& key,
+                                 std::shared_ptr<FileReader> file_reader) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+
+    if (auto iter = _file_name_to_reader.find(key); iter == 
_file_name_to_reader.end()) {
+        if (config::file_cache_max_file_reader_cache_size == 
_file_reader_list.size()) {
+            _file_name_to_reader.erase(_file_reader_list.back().first);
+            _file_reader_list.pop_back();
+        }
+        _file_reader_list.emplace_front(key, std::move(file_reader));
+        _file_name_to_reader.insert(std::make_pair(key, 
_file_reader_list.begin()));
+    }
+}
+
+void FDCache::remove_file_reader(const AccessKeyAndOffset& key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        _file_reader_list.erase(iter->second);
+        _file_name_to_reader.erase(key);
+    }
+}
+
+bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) {
+    std::shared_lock rlock(_mtx);
+    return _file_name_to_reader.contains(key);
+}
+
+size_t FDCache::file_reader_cache_size() {
+    std::shared_lock rlock(_mtx);
+    return _file_reader_list.size();
+}
+
+Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+    _cache_base_path = _mgr->_cache_base_path;
+    RETURN_IF_ERROR(rebuild_data_structure());
+    _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+        load_cache_info_into_memory(mgr);
+        mgr->_lazy_open_done = true;
+        LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+    });
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) 
{
+    FileWriter* writer = nullptr;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        if (auto iter = _key_to_writer.find(file_writer_map_key); iter != 
_key_to_writer.end()) {
+            writer = iter->second.get();
+        } else {
+            std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+            auto st = fs->create_directory(dir, false);
+            if (!st.ok() && !st.is<ErrorCode::ALREADY_EXIST>()) {
+                return st;
+            }
+            std::string tmp_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type, true);
+            FileWriterPtr file_writer;
+            FileWriterOptions opts {.sync_file_data = false};
+            RETURN_IF_ERROR(fs->create_file(tmp_file, &file_writer, &opts));
+            writer = file_writer.get();
+            _key_to_writer.emplace(file_writer_map_key, 
std::move(file_writer));
+        }
+    }
+    DCHECK_NE(writer, nullptr);
+    return writer->append(value);
+}
+
+Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
+    FileWriterPtr file_writer;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        auto iter = _key_to_writer.find(file_writer_map_key);
+        DCHECK(iter != _key_to_writer.end());
+        file_writer = std::move(iter->second);
+        _key_to_writer.erase(iter);
+    }
+    if (!file_writer->is_closed()) {
+        RETURN_IF_ERROR(file_writer->close());
+    }
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string true_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+    return fs->rename(file_writer->path(), true_file);
+}
+
+Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, 
Slice buffer) {
+    AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
+    FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
+    if (!file_reader) {
+        std::string file =
+                get_path_in_local_cache(get_path_in_local_cache(key.hash, 
key.meta.expiration_time),
+                                        key.offset, key.meta.type);
+        RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+        FDCache::instance()->insert_file_reader(fd_key, file_reader);
+    }
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
+    DCHECK(bytes_read == buffer.get_size());
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type);
+    FDCache::instance()->remove_file_reader(std::make_pair(key.hash, 
key.offset));
+    RETURN_IF_ERROR(fs->delete_file(file));
+    std::vector<FileInfo> files;
+    bool exists {false};
+    RETURN_IF_ERROR(fs->list(dir, true, &files, &exists));
+    DCHECK(exists);
+    if (files.empty()) {
+        RETURN_IF_ERROR(fs->delete_directory(dir));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const 
KeyMeta& new_meta) {
+    // TTL change
+    if (key.meta.expiration_time != new_meta.expiration_time) {
+        std::string original_dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string new_dir = get_path_in_local_cache(key.hash, 
new_meta.expiration_time);
+        // It will be concurrent, but we don't care who rename
+        Status st = fs->rename(original_dir, new_dir);
+        if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+            return st;
+        }
+    } else if (key.meta.type != new_meta.type) {
+        std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string original_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+        std::string new_file = get_path_in_local_cache(dir, key.offset, 
new_meta.type);
+        RETURN_IF_ERROR(fs->rename(original_file, new_file));
+    }
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& 
dir, size_t offset,
+                                                        FileCacheType type, 
bool is_tmp) {
+    return Path(dir) / (std::to_string(offset) +
+                        (is_tmp ? "_tmp" : 
BlockFileCache::cache_type_to_string(type)));
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& 
value,
+                                                        uint64_t 
expiration_time) const {
+    auto str = value.to_string();
+    try {
+        if constexpr (USE_CACHE_VERSION2) {
+            return Path(_cache_base_path) / str.substr(0, KEY_PREFIX_LENGTH) /
+                   (str + "_" + std::to_string(expiration_time));
+        } else {
+            return Path(_cache_base_path) / (str + "_" + 
std::to_string(expiration_time));
+        }
+    } catch (std::filesystem::filesystem_error& e) {
+        LOG_WARNING("fail to get_path_in_local_cache")
+                .tag("err", e.what())
+                .tag("key", value.to_string())
+                .tag("expiration_time", expiration_time);
+        return "";
+    }
+}
+
+Status FSFileCacheStorage::rebuild_data_structure() const {
+    /// version 1.0: cache_base_path / key / offset
+    /// version 2.0: cache_base_path / key_prefix / key / offset
+    std::string version;
+    RETURN_IF_ERROR(read_file_cache_version(&version));
+    if (USE_CACHE_VERSION2 && version != "2.0") {
+        // move directories format as version 2.0
+        std::error_code ec;
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) {
+            return Status::InternalError("Failed to list dir {}: {}", 
_cache_base_path,
+                                         ec.message());
+        }
+        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+            if (key_it->is_directory()) {
+                std::string cache_key = key_it->path().filename().native();
+                if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                    std::string key_prefix =
+                            Path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
+                    bool exists = false;
+                    RETURN_IF_ERROR(fs->exists(key_prefix, &exists));
+                    if (!exists) {
+                        RETURN_IF_ERROR(fs->create_directory(key_prefix));
+                    }
+                    RETURN_IF_ERROR(fs->rename(key_it->path(), key_prefix / 
cache_key));
+                }
+            }
+        }
+        if (!write_file_cache_version().ok()) {
+            return Status::InternalError("Failed to write version hints for 
file cache");
+        }
+    }
+
+    auto rebuild_dir = [&](std::filesystem::directory_iterator& 
upgrade_key_it) -> Status {
+        for (; upgrade_key_it != std::filesystem::directory_iterator(); 
++upgrade_key_it) {
+            if (upgrade_key_it->path().filename().native().find('_') == 
std::string::npos) {
+                
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
+                RETURN_IF_ERROR(
+                        fs->rename(upgrade_key_it->path(), 
upgrade_key_it->path().native() + "_0"));
+            }
+        }
+        return Status::OK();
+    };
+    std::error_code ec;
+    if constexpr (USE_CACHE_VERSION2) {
+        std::filesystem::directory_iterator key_prefix_it {_cache_base_path, 
ec};
+        if (ec) [[unlikely]] {
+            LOG(WARNING) << ec.message();
+            return Status::IOError(ec.message());
+        }
+        for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
+            if (!key_prefix_it->is_directory()) {
+                // maybe version hits file
+                continue;
+            }
+            if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
+                LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
+                             << ", try to remove it";
+                RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
+            }
+            std::filesystem::directory_iterator key_it {key_prefix_it->path(), 
ec};
+            if (ec) [[unlikely]] {
+                return Status::IOError(ec.message());
+            }
+            RETURN_IF_ERROR(rebuild_dir(key_it));
+        }
+    } else {
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) [[unlikely]] {
+            return Status::IOError(ec.message());
+        }
+        RETURN_IF_ERROR(rebuild_dir(key_it));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::write_file_cache_version() const {
+    if constexpr (USE_CACHE_VERSION2) {
+        std::string version_path = get_version_path();
+        Slice version("2.0");
+        FileWriterPtr version_writer;
+        RETURN_IF_ERROR(fs->create_file(version_path, &version_writer));
+        RETURN_IF_ERROR(version_writer->append(version));
+        return version_writer->close();
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::read_file_cache_version(std::string* buffer) const {
+    std::string version_path = get_version_path();
+    bool exists = false;
+    RETURN_IF_ERROR(fs->exists(version_path, &exists));
+    if (!exists) {
+        *buffer = "1.0";
+        return Status::OK();
+    }
+    FileReaderSPtr version_reader;
+    int64_t file_size = -1;
+    RETURN_IF_ERROR(fs->file_size(version_path, &file_size));
+    buffer->resize(file_size);
+    RETURN_IF_ERROR(fs->open_file(version_path, &version_reader));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(version_reader->read_at(0, Slice(buffer->data(), 
file_size), &bytes_read));
+    RETURN_IF_ERROR(version_reader->close());
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_version_path() const {
+    return Path(_cache_base_path) / "version";
+}
+
+void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {

Review Comment:
   warning: function 'load_cache_info_into_memory' has cognitive complexity of 
77 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/io/cache/fs_file_cache_storage.cpp:342:** nesting level increased 
to 1
   ```cpp
       auto add_cell_batch_func = [&]() {
                                  ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:344:** nesting level increased 
to 2
   ```cpp
           auto f = [&](const BatchLoadArgs& args) {
                    ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:346:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (_mgr->_files.contains(args.hash) && 
_mgr->_files[args.hash].contains(args.offset)) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:346:** +1
   ```cpp
               if (_mgr->_files.contains(args.hash) && 
_mgr->_files[args.hash].contains(args.offset)) {
                                                    ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:350:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (!args.is_tmp) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:357:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (ec) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:365:** nesting level increased 
to 1
   ```cpp
       auto scan_file_cache = [&](std::filesystem::directory_iterator& key_it) {
                              ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:367:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:376:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (ec) [[unlikely]] {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:384:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               for (; offset_it != std::filesystem::directory_iterator(); 
++offset_it) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:392:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                       if (delim_pos1 == std::string::npos) {
                       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:395:** +1, nesting level 
increased to 4
   ```cpp
                       } else {
                         ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:400:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                           if (suffix == "tmp") [[unlikely]] {
                           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:402:** +1, nesting level 
increased to 5
   ```cpp
                           } else {
                             ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:406:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   } catch (...) {
                     ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:410:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (!parsed) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:416:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (ec) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:422:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (size == 0 && !is_tmp) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:422:** +1
   ```cpp
                   if (size == 0 && !is_tmp) {
                                 ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:424:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       if (!st.ok()) {
                       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:441:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (batch_load_buffer.size() >= scan_length) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:449:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if constexpr (USE_CACHE_VERSION2) {
       ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:451:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (ec) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:455:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:456:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (!key_prefix_it->is_directory()) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:460:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:465:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (ec) {
                   ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:472:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (ec) {
               ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:478:** +1, nesting level 
increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:480:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (ec) {
           ^
   ```
   **be/src/io/cache/fs_file_cache_storage.cpp:486:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if (!batch_load_buffer.empty()) {
       ^
   ```
   
   </details>
   



##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -0,0 +1,583 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/fs_file_cache_storage.h"
+
+#include <filesystem>
+#include <mutex>
+#include <system_error>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer.h"
+#include "runtime/exec_env.h"
+#include "vec/common/hex.h"
+
+namespace doris::io {
+
+struct BatchLoadArgs {
+    UInt128Wrapper hash;
+    CacheContext ctx;
+    uint64_t offset;
+    size_t size;
+    std::string key_path;
+    std::string offset_path;
+    bool is_tmp;
+};
+
+FDCache* FDCache::instance() {
+    return ExecEnv::GetInstance()->file_cache_open_fd_cache();
+}
+
+std::shared_ptr<FileReader> FDCache::get_file_reader(const AccessKeyAndOffset& 
key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return nullptr;
+    }
+    std::shared_lock rlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        return iter->second->second;
+    }
+    return nullptr;
+}
+
+void FDCache::insert_file_reader(const AccessKeyAndOffset& key,
+                                 std::shared_ptr<FileReader> file_reader) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+
+    if (auto iter = _file_name_to_reader.find(key); iter == 
_file_name_to_reader.end()) {
+        if (config::file_cache_max_file_reader_cache_size == 
_file_reader_list.size()) {
+            _file_name_to_reader.erase(_file_reader_list.back().first);
+            _file_reader_list.pop_back();
+        }
+        _file_reader_list.emplace_front(key, std::move(file_reader));
+        _file_name_to_reader.insert(std::make_pair(key, 
_file_reader_list.begin()));
+    }
+}
+
+void FDCache::remove_file_reader(const AccessKeyAndOffset& key) {
+    if (config::file_cache_max_file_reader_cache_size == 0) [[unlikely]] {
+        return;
+    }
+    std::lock_guard wlock(_mtx);
+    if (auto iter = _file_name_to_reader.find(key); iter != 
_file_name_to_reader.end()) {
+        _file_reader_list.erase(iter->second);
+        _file_name_to_reader.erase(key);
+    }
+}
+
+bool FDCache::contains_file_reader(const AccessKeyAndOffset& key) {
+    std::shared_lock rlock(_mtx);
+    return _file_name_to_reader.contains(key);
+}
+
+size_t FDCache::file_reader_cache_size() {
+    std::shared_lock rlock(_mtx);
+    return _file_reader_list.size();
+}
+
+Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+    _cache_base_path = _mgr->_cache_base_path;
+    RETURN_IF_ERROR(rebuild_data_structure());
+    _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+        load_cache_info_into_memory(mgr);
+        mgr->_lazy_open_done = true;
+        LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
+    });
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::append(const FileCacheKey& key, const Slice& value) 
{
+    FileWriter* writer = nullptr;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        if (auto iter = _key_to_writer.find(file_writer_map_key); iter != 
_key_to_writer.end()) {
+            writer = iter->second.get();
+        } else {
+            std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+            auto st = fs->create_directory(dir, false);
+            if (!st.ok() && !st.is<ErrorCode::ALREADY_EXIST>()) {
+                return st;
+            }
+            std::string tmp_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type, true);
+            FileWriterPtr file_writer;
+            FileWriterOptions opts {.sync_file_data = false};
+            RETURN_IF_ERROR(fs->create_file(tmp_file, &file_writer, &opts));
+            writer = file_writer.get();
+            _key_to_writer.emplace(file_writer_map_key, 
std::move(file_writer));
+        }
+    }
+    DCHECK_NE(writer, nullptr);
+    return writer->append(value);
+}
+
+Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
+    FileWriterPtr file_writer;
+    {
+        std::lock_guard lock(_mtx);
+        auto file_writer_map_key = std::make_pair(key.hash, key.offset);
+        auto iter = _key_to_writer.find(file_writer_map_key);
+        DCHECK(iter != _key_to_writer.end());
+        file_writer = std::move(iter->second);
+        _key_to_writer.erase(iter);
+    }
+    if (!file_writer->is_closed()) {
+        RETURN_IF_ERROR(file_writer->close());
+    }
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string true_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+    return fs->rename(file_writer->path(), true_file);
+}
+
+Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, 
Slice buffer) {
+    AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
+    FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
+    if (!file_reader) {
+        std::string file =
+                get_path_in_local_cache(get_path_in_local_cache(key.hash, 
key.meta.expiration_time),
+                                        key.offset, key.meta.type);
+        RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+        FDCache::instance()->insert_file_reader(fd_key, file_reader);
+    }
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(file_reader->read_at(value_offset, buffer, &bytes_read));
+    DCHECK(bytes_read == buffer.get_size());
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::remove(const FileCacheKey& key) {
+    std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+    std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type);
+    FDCache::instance()->remove_file_reader(std::make_pair(key.hash, 
key.offset));
+    RETURN_IF_ERROR(fs->delete_file(file));
+    std::vector<FileInfo> files;
+    bool exists {false};
+    RETURN_IF_ERROR(fs->list(dir, true, &files, &exists));
+    DCHECK(exists);
+    if (files.empty()) {
+        RETURN_IF_ERROR(fs->delete_directory(dir));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const 
KeyMeta& new_meta) {
+    // TTL change
+    if (key.meta.expiration_time != new_meta.expiration_time) {
+        std::string original_dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string new_dir = get_path_in_local_cache(key.hash, 
new_meta.expiration_time);
+        // It will be concurrent, but we don't care who rename
+        Status st = fs->rename(original_dir, new_dir);
+        if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+            return st;
+        }
+    } else if (key.meta.type != new_meta.type) {
+        std::string dir = get_path_in_local_cache(key.hash, 
key.meta.expiration_time);
+        std::string original_file = get_path_in_local_cache(dir, key.offset, 
key.meta.type);
+        std::string new_file = get_path_in_local_cache(dir, key.offset, 
new_meta.type);
+        RETURN_IF_ERROR(fs->rename(original_file, new_file));
+    }
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& 
dir, size_t offset,
+                                                        FileCacheType type, 
bool is_tmp) {
+    return Path(dir) / (std::to_string(offset) +
+                        (is_tmp ? "_tmp" : 
BlockFileCache::cache_type_to_string(type)));
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& 
value,
+                                                        uint64_t 
expiration_time) const {
+    auto str = value.to_string();
+    try {
+        if constexpr (USE_CACHE_VERSION2) {
+            return Path(_cache_base_path) / str.substr(0, KEY_PREFIX_LENGTH) /
+                   (str + "_" + std::to_string(expiration_time));
+        } else {
+            return Path(_cache_base_path) / (str + "_" + 
std::to_string(expiration_time));
+        }
+    } catch (std::filesystem::filesystem_error& e) {
+        LOG_WARNING("fail to get_path_in_local_cache")
+                .tag("err", e.what())
+                .tag("key", value.to_string())
+                .tag("expiration_time", expiration_time);
+        return "";
+    }
+}
+
+Status FSFileCacheStorage::rebuild_data_structure() const {
+    /// version 1.0: cache_base_path / key / offset
+    /// version 2.0: cache_base_path / key_prefix / key / offset
+    std::string version;
+    RETURN_IF_ERROR(read_file_cache_version(&version));
+    if (USE_CACHE_VERSION2 && version != "2.0") {
+        // move directories format as version 2.0
+        std::error_code ec;
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) {
+            return Status::InternalError("Failed to list dir {}: {}", 
_cache_base_path,
+                                         ec.message());
+        }
+        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+            if (key_it->is_directory()) {
+                std::string cache_key = key_it->path().filename().native();
+                if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                    std::string key_prefix =
+                            Path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
+                    bool exists = false;
+                    RETURN_IF_ERROR(fs->exists(key_prefix, &exists));
+                    if (!exists) {
+                        RETURN_IF_ERROR(fs->create_directory(key_prefix));
+                    }
+                    RETURN_IF_ERROR(fs->rename(key_it->path(), key_prefix / 
cache_key));
+                }
+            }
+        }
+        if (!write_file_cache_version().ok()) {
+            return Status::InternalError("Failed to write version hints for 
file cache");
+        }
+    }
+
+    auto rebuild_dir = [&](std::filesystem::directory_iterator& 
upgrade_key_it) -> Status {
+        for (; upgrade_key_it != std::filesystem::directory_iterator(); 
++upgrade_key_it) {
+            if (upgrade_key_it->path().filename().native().find('_') == 
std::string::npos) {
+                
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
+                RETURN_IF_ERROR(
+                        fs->rename(upgrade_key_it->path(), 
upgrade_key_it->path().native() + "_0"));
+            }
+        }
+        return Status::OK();
+    };
+    std::error_code ec;
+    if constexpr (USE_CACHE_VERSION2) {
+        std::filesystem::directory_iterator key_prefix_it {_cache_base_path, 
ec};
+        if (ec) [[unlikely]] {
+            LOG(WARNING) << ec.message();
+            return Status::IOError(ec.message());
+        }
+        for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
+            if (!key_prefix_it->is_directory()) {
+                // maybe version hits file
+                continue;
+            }
+            if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
+                LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
+                             << ", try to remove it";
+                RETURN_IF_ERROR(fs->delete_directory(key_prefix_it->path()));
+            }
+            std::filesystem::directory_iterator key_it {key_prefix_it->path(), 
ec};
+            if (ec) [[unlikely]] {
+                return Status::IOError(ec.message());
+            }
+            RETURN_IF_ERROR(rebuild_dir(key_it));
+        }
+    } else {
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) [[unlikely]] {
+            return Status::IOError(ec.message());
+        }
+        RETURN_IF_ERROR(rebuild_dir(key_it));
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::write_file_cache_version() const {
+    if constexpr (USE_CACHE_VERSION2) {
+        std::string version_path = get_version_path();
+        Slice version("2.0");
+        FileWriterPtr version_writer;
+        RETURN_IF_ERROR(fs->create_file(version_path, &version_writer));
+        RETURN_IF_ERROR(version_writer->append(version));
+        return version_writer->close();
+    }
+    return Status::OK();
+}
+
+Status FSFileCacheStorage::read_file_cache_version(std::string* buffer) const {
+    std::string version_path = get_version_path();
+    bool exists = false;
+    RETURN_IF_ERROR(fs->exists(version_path, &exists));
+    if (!exists) {
+        *buffer = "1.0";
+        return Status::OK();
+    }
+    FileReaderSPtr version_reader;
+    int64_t file_size = -1;
+    RETURN_IF_ERROR(fs->file_size(version_path, &file_size));
+    buffer->resize(file_size);
+    RETURN_IF_ERROR(fs->open_file(version_path, &version_reader));
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(version_reader->read_at(0, Slice(buffer->data(), 
file_size), &bytes_read));
+    RETURN_IF_ERROR(version_reader->close());
+    return Status::OK();
+}
+
+std::string FSFileCacheStorage::get_version_path() const {
+    return Path(_cache_base_path) / "version";
+}
+
+void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {
+    int scan_length = 10000;
+    std::vector<BatchLoadArgs> batch_load_buffer;
+    batch_load_buffer.reserve(scan_length);
+    auto add_cell_batch_func = [&]() {
+        std::lock_guard cache_lock(_mgr->_mutex);
+        auto f = [&](const BatchLoadArgs& args) {
+            // in async load mode, a cell may be added twice.
+            if (_mgr->_files.contains(args.hash) && 
_mgr->_files[args.hash].contains(args.offset)) {
+                return;
+            }
+            // if the file is tmp, it means it is the old file and it should 
be removed
+            if (!args.is_tmp) {
+                _mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
+                               FileBlock::State::DOWNLOADED, cache_lock);
+                return;
+            }
+            std::error_code ec;
+            std::filesystem::remove(args.offset_path, ec);
+            if (ec) {
+                LOG(WARNING) << fmt::format("cannot remove {}: {}", 
args.offset_path, ec.message());
+            }
+        };
+        std::for_each(batch_load_buffer.begin(), batch_load_buffer.end(), f);
+        batch_load_buffer.clear();
+    };
+
+    auto scan_file_cache = [&](std::filesystem::directory_iterator& key_it) {
+        TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile1");
+        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+            auto key_with_suffix = key_it->path().filename().native();
+            auto delim_pos = key_with_suffix.find('_');
+            DCHECK(delim_pos != std::string::npos);
+            std::string key_str = key_with_suffix.substr(0, delim_pos);
+            std::string expiration_time_str = key_with_suffix.substr(delim_pos 
+ 1);
+            auto hash = 
UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str()));
+            std::error_code ec;
+            std::filesystem::directory_iterator offset_it(key_it->path(), ec);
+            if (ec) [[unlikely]] {
+                LOG(WARNING) << "filesystem error, failed to remove file, 
file=" << key_it->path()
+                             << " error=" << ec.message();
+                continue;
+            }
+            CacheContext context;
+            context.query_id = TUniqueId();
+            context.expiration_time = std::stoul(expiration_time_str);
+            for (; offset_it != std::filesystem::directory_iterator(); 
++offset_it) {
+                std::string offset_with_suffix = 
offset_it->path().filename().native();
+                auto delim_pos1 = offset_with_suffix.find('_');
+                FileCacheType cache_type = FileCacheType::NORMAL;
+                bool parsed = true;
+                bool is_tmp = false;
+                size_t offset = 0;
+                try {
+                    if (delim_pos1 == std::string::npos) {
+                        // same as type "normal"
+                        offset = stoull(offset_with_suffix);
+                    } else {
+                        offset = stoull(offset_with_suffix.substr(0, 
delim_pos1));
+                        std::string suffix = 
offset_with_suffix.substr(delim_pos1 + 1);
+                        // not need persistent anymore
+                        // if suffix is equals to "tmp", it should be removed 
too.
+                        if (suffix == "tmp") [[unlikely]] {
+                            is_tmp = true;
+                        } else {
+                            cache_type = 
BlockFileCache::string_to_cache_type(suffix);
+                        }
+                    }
+                } catch (...) {
+                    parsed = false;
+                }
+
+                if (!parsed) {
+                    LOG(WARNING) << "parse offset err, path=" << 
offset_it->path().native();
+                    continue;
+                }
+                TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE_2", 
&offset_with_suffix);
+                size_t size = offset_it->file_size(ec);
+                if (ec) {
+                    LOG(WARNING) << "failed to file_size: file_name=" << 
offset_with_suffix
+                                 << "error=" << ec.message();
+                    continue;
+                }
+
+                if (size == 0 && !is_tmp) {
+                    auto st = fs->delete_file(offset_it->path());
+                    if (!st.ok()) {
+                        LOG_WARNING("delete file {} error", 
offset_it->path().native()).error(st);
+                    }
+                    continue;
+                }
+                context.cache_type = cache_type;
+                BatchLoadArgs args;
+                args.ctx = context;
+                args.hash = hash;
+                args.key_path = key_it->path();
+                args.offset_path = offset_it->path();
+                args.size = size;
+                args.offset = offset;
+                args.is_tmp = is_tmp;
+                batch_load_buffer.push_back(std::move(args));
+
+                // add lock
+                if (batch_load_buffer.size() >= scan_length) {
+                    add_cell_batch_func();
+                    std::this_thread::sleep_for(std::chrono::microseconds(10));
+                }
+            }
+        }
+    };
+    std::error_code ec;
+    if constexpr (USE_CACHE_VERSION2) {
+        std::filesystem::directory_iterator key_prefix_it {_cache_base_path, 
ec};
+        if (ec) {
+            LOG(WARNING) << ec.message();
+            return;
+        }
+        for (; key_prefix_it != std::filesystem::directory_iterator(); 
++key_prefix_it) {
+            if (!key_prefix_it->is_directory()) {
+                // maybe version hits file
+                continue;
+            }
+            if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
+                LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
+                             << ", try to remove it";
+                std::error_code ec;
+                std::filesystem::remove(key_prefix_it->path(), ec);
+                if (ec) {
+                    LOG(WARNING) << "failed to remove=" << 
key_prefix_it->path()
+                                 << " msg=" << ec.message();
+                }
+                continue;
+            }
+            std::filesystem::directory_iterator key_it {key_prefix_it->path(), 
ec};
+            if (ec) {
+                LOG(WARNING) << ec.message();
+                continue;
+            }
+            scan_file_cache(key_it);
+        }
+    } else {
+        std::filesystem::directory_iterator key_it {_cache_base_path, ec};
+        if (ec) {
+            LOG(WARNING) << ec.message();
+            return;
+        }
+        scan_file_cache(key_it);
+    }
+    if (!batch_load_buffer.empty()) {
+        add_cell_batch_func();
+    }
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
+}
+
+void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, 
const FileCacheKey& key,

Review Comment:
   warning: function 'load_blocks_directly_unlocked' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, 
const FileCacheKey& key,
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/io/cache/fs_file_cache_storage.cpp:492:** 81 lines including 
whitespace and comments (threshold 80)
   ```cpp
   void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, 
const FileCacheKey& key,
                            ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to