This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 4c3a586b6ae branch-4.0: [fix](packed-file) Fix packed file cache 
cleanup issue (#59892) (#60052)
4c3a586b6ae is described below

commit 4c3a586b6ae451761f9b6d43c62af03bce6d0a8f
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jan 20 17:30:56 2026 +0800

    branch-4.0: [fix](packed-file) Fix packed file cache cleanup issue (#59892) 
(#60052)
    
    Cherry-picked from https://github.com/apache/doris/pull/59892
---
 be/src/io/fs/packed_file_manager.cpp           | 121 ++++++++++++++++++++++++-
 be/src/io/fs/packed_file_system.cpp            |  42 ++++++---
 be/test/io/fs/packed_file_concurrency_test.cpp |  15 ++-
 3 files changed, 160 insertions(+), 18 deletions(-)

diff --git a/be/src/io/fs/packed_file_manager.cpp 
b/be/src/io/fs/packed_file_manager.cpp
index 3cf65f12e49..53a445502c7 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -41,10 +41,15 @@
 #include "cloud/config.h"
 #include "common/config.h"
 #include "gen_cpp/cloud.pb.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
 #include "io/fs/packed_file_trailer.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
 #include "util/coding.h"
+#include "util/slice.h"
 #include "util/uid_util.h"
 
 namespace doris::io {
@@ -75,6 +80,12 @@ bvar::Window<bvar::IntRecorder> 
g_packed_file_uploading_to_uploaded_ms_window(
         "packed_file_uploading_to_uploaded_ms", 
&g_packed_file_uploading_to_uploaded_ms_recorder,
         /*window_size=*/10);
 
+// Metrics for async small file cache write
+bvar::Adder<int64_t> g_packed_file_cache_async_write_count("packed_file_cache",
+                                                           
"async_write_count");
+bvar::Adder<int64_t> g_packed_file_cache_async_write_bytes("packed_file_cache",
+                                                           
"async_write_bytes");
+
 Status append_packed_info_trailer(FileWriter* writer, const std::string& 
packed_file_path,
                                   const cloud::PackedFileInfoPB& 
packed_file_info) {
     if (writer == nullptr) {
@@ -108,6 +119,104 @@ Status append_packed_info_trailer(FileWriter* writer, 
const std::string& packed_
     return writer->append(Slice(trailer));
 }
 
+// write small file data to file cache
+void do_write_to_file_cache(const std::string& small_file_path, const 
std::string& data,
+                            int64_t tablet_id) {
+    if (data.empty()) {
+        return;
+    }
+
+    // Generate cache key from small file path (e.g., "rowset_id_seg_id.dat")
+    Path path(small_file_path);
+    UInt128Wrapper cache_hash = BlockFileCache::hash(path.filename().native());
+
+    VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
+               << " filename=" << path.filename().native() << " hash=" << 
cache_hash.to_string()
+               << " size=" << data.size() << " tablet_id=" << tablet_id;
+
+    BlockFileCache* file_cache = 
FileCacheFactory::instance()->get_by_path(cache_hash);
+    if (file_cache == nullptr) {
+        return; // Cache not available, skip
+    }
+
+    // Allocate cache blocks
+    CacheContext ctx;
+    ctx.cache_type = FileCacheType::NORMAL;
+    ReadStatistics stats;
+    ctx.stats = &stats;
+
+    FileBlocksHolder holder = file_cache->get_or_set(cache_hash, 0, 
data.size(), ctx);
+
+    // Write data to cache blocks
+    size_t data_offset = 0;
+    for (auto& block : holder.file_blocks) {
+        if (data_offset >= data.size()) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t write_size = std::min(block_size, data.size() - data_offset);
+
+        if (block->state() == FileBlock::State::EMPTY) {
+            block->get_or_set_downloader();
+            if (block->is_downloader()) {
+                Slice s(data.data() + data_offset, write_size);
+                Status st = block->append(s);
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG(WARNING) << "Write small file to cache failed: " << 
st.msg();
+                }
+            }
+        }
+        data_offset += write_size;
+    }
+}
+
+// Async wrapper: submit cache write task to thread pool
+// The data is copied into the lambda capture to ensure its lifetime extends 
beyond
+// the async task execution. The original Slice may reference a buffer that 
gets
+// reused or freed before the async task runs.
+void write_small_file_to_cache_async(const std::string& small_file_path, const 
Slice& data,
+                                     int64_t tablet_id) {
+    if (!config::enable_file_cache || data.size == 0) {
+        return;
+    }
+
+    // Copy data since original buffer may be reused before async task executes
+    // For small files (< 1MB), copy overhead is acceptable
+    std::string data_copy(data.data, data.size);
+    size_t data_size = data.size;
+
+    auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
+    if (thread_pool == nullptr) {
+        // Fallback to sync write if thread pool not available
+        do_write_to_file_cache(small_file_path, data_copy, tablet_id);
+        return;
+    }
+
+    // Track async write count and bytes
+    g_packed_file_cache_async_write_count << 1;
+    g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
+
+    Status st = thread_pool->submit_func(
+            [path = small_file_path, data = std::move(data_copy), tablet_id, 
data_size]() {
+                do_write_to_file_cache(path, data, tablet_id);
+                // Decrement async write count after completion
+                g_packed_file_cache_async_write_count << -1;
+                g_packed_file_cache_async_write_bytes << 
-static_cast<int64_t>(data_size);
+            });
+
+    if (!st.ok()) {
+        // Revert metrics since task was not submitted
+        g_packed_file_cache_async_write_count << -1;
+        g_packed_file_cache_async_write_bytes << 
-static_cast<int64_t>(data_size);
+        LOG(WARNING) << "Failed to submit cache write task for " << 
small_file_path << ": "
+                     << st.msg();
+        // Don't block on failure, cache write is best-effort
+    }
+}
+
 } // namespace
 
 PackedFileManager* PackedFileManager::instance() {
@@ -150,8 +259,11 @@ Status PackedFileManager::create_new_packed_file_context(
     // Create file writer for the packed file
     FileWriterPtr new_writer;
     FileWriterOptions opts;
-    // enable write file cache for packed file
-    opts.write_file_cache = true;
+    // Disable write_file_cache for packed file itself.
+    // We write file cache for each small file separately in 
append_small_file()
+    // using the small file path as cache key, ensuring cache entries can be
+    // properly cleaned up when stale rowsets are removed.
+    opts.write_file_cache = false;
     RETURN_IF_ERROR(
             packed_file_ctx->file_system->create_file(Path(relative_path), 
&new_writer, &opts));
     packed_file_ctx->writer = std::move(new_writer);
@@ -253,6 +365,11 @@ Status PackedFileManager::append_small_file(const 
std::string& path, const Slice
     // Write data to current packed file
     RETURN_IF_ERROR(active_state->writer->append(data));
 
+    // Async write data to file cache using small file path as cache key.
+    // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
+    // allowing proper cache cleanup when stale rowsets are removed.
+    write_small_file_to_cache_async(path, data, info.tablet_id);
+
     // Update index
     PackedSliceLocation location;
     location.packed_file_path = active_state->packed_file_path;
diff --git a/be/src/io/fs/packed_file_system.cpp 
b/be/src/io/fs/packed_file_system.cpp
index dd5b136ba3b..7ce027a94a2 100644
--- a/be/src/io/fs/packed_file_system.cpp
+++ b/be/src/io/fs/packed_file_system.cpp
@@ -20,6 +20,7 @@
 #include <utility>
 
 #include "common/status.h"
+#include "io/fs/file_reader.h"
 #include "io/fs/packed_file_reader.h"
 #include "io/fs/packed_file_writer.h"
 
@@ -69,18 +70,37 @@ Status PackedFileSystem::open_file_impl(const Path& file, 
FileReaderSPtr* reader
         // File is in packed file, open packed file and wrap with 
PackedFileReader
         const auto& index = it->second;
         FileReaderSPtr inner_reader;
-        // Create a new FileReaderOptions with the correct file size
-        FileReaderOptions local_opts = opts ? *opts : FileReaderOptions();
-        // Set file_size to packed file size to avoid head object request
-        local_opts.file_size = index.packed_file_size;
-        LOG(INFO) << "open packed file: " << index.packed_file_path << ", 
file: " << file.native()
-                  << ", offset: " << index.offset << ", size: " << index.size
-                  << ", packed_file_size: " << index.packed_file_size;
+
+        // Create options for opening the packed file
+        // Disable cache at this layer - we'll add cache wrapper around 
PackedFileReader instead
+        // This ensures cache key is based on segment path, not packed file 
path
+        FileReaderOptions inner_opts = opts ? *opts : FileReaderOptions();
+        inner_opts.file_size = index.packed_file_size;
+        inner_opts.cache_type = FileCachePolicy::NO_CACHE;
+
+        VLOG_DEBUG << "open packed file: " << index.packed_file_path << ", 
file: " << file.native()
+                   << ", offset: " << index.offset << ", size: " << index.size
+                   << ", packed_file_size: " << index.packed_file_size;
         RETURN_IF_ERROR(
-                _inner_fs->open_file(Path(index.packed_file_path), 
&inner_reader, &local_opts));
+                _inner_fs->open_file(Path(index.packed_file_path), 
&inner_reader, &inner_opts));
+
+        // Create PackedFileReader with segment path
+        // PackedFileReader.path() returns segment path, not packed file path
+        auto packed_reader = 
std::make_shared<PackedFileReader>(std::move(inner_reader), file,
+                                                                index.offset, 
index.size);
 
-        *reader = std::make_shared<PackedFileReader>(std::move(inner_reader), 
file, index.offset,
-                                                     index.size);
+        // If cache is requested, wrap PackedFileReader with 
CachedRemoteFileReader
+        // This ensures:
+        // 1. Cache key = hash(segment_path.filename()) - matches cleanup key
+        // 2. Cache size = segment size - correct boundary
+        // 3. Each segment has independent cache entry - no interference 
during cleanup
+        if (opts && opts->cache_type != FileCachePolicy::NO_CACHE) {
+            FileReaderOptions cache_opts = *opts;
+            cache_opts.file_size = index.size; // Use segment size for cache
+            *reader = DORIS_TRY(create_cached_file_reader(packed_reader, 
cache_opts));
+        } else {
+            *reader = packed_reader;
+        }
     } else {
         RETURN_IF_ERROR(_inner_fs->open_file(file, reader, opts));
     }
@@ -88,7 +108,7 @@ Status PackedFileSystem::open_file_impl(const Path& file, 
FileReaderSPtr* reader
 }
 
 Status PackedFileSystem::exists_impl(const Path& path, bool* res) const {
-    LOG(INFO) << "packed file system exist, rowset id " << 
_append_info.rowset_id;
+    VLOG_DEBUG << "packed file system exist, rowset id " << 
_append_info.rowset_id;
     if (!_index_map_initialized) {
         return Status::InternalError("PackedFileSystem index map is not 
initialized");
     }
diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp 
b/be/test/io/fs/packed_file_concurrency_test.cpp
index 06db472b95e..31c2db19fb8 100644
--- a/be/test/io/fs/packed_file_concurrency_test.cpp
+++ b/be/test/io/fs/packed_file_concurrency_test.cpp
@@ -677,7 +677,10 @@ TEST_F(MergeFileConcurrencyTest, 
ConcurrentWriteReadCorrectness) {
             std::uniform_int_distribution<int> read_size_dist(4 * 1024, 32 * 
1024);
 
             for (int iter = 0; iter < kIterationPerThread; ++iter) {
-                std::string path = fmt::format("/tablet_{}/rowset_{}/file_{}", 
tid, iter, iter);
+                // Use unique file names to avoid cache key conflicts between 
threads
+                // since CachedRemoteFileReader uses path().filename() for 
cache hash
+                std::string path =
+                        fmt::format("/tablet_{}/rowset_{}/file_t{}_i{}", tid, 
iter, tid, iter);
 
                 PackedAppendContext append_info;
                 append_info.resource_id = resource_ids[tid];
@@ -718,11 +721,13 @@ TEST_F(MergeFileConcurrencyTest, 
ConcurrentWriteReadCorrectness) {
                 opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
                 opts.is_doris_table = true;
                 ASSERT_TRUE(reader_fs.open_file(Path(path), &reader, 
&opts).ok());
-                auto* merge_reader = 
dynamic_cast<PackedFileReader*>(reader.get());
-                ASSERT_NE(merge_reader, nullptr);
-                auto* cached_reader =
-                        
dynamic_cast<CachedRemoteFileReader*>(merge_reader->_inner_reader.get());
+                // After the fix, CachedRemoteFileReader wraps 
PackedFileReader (not vice versa)
+                // This ensures cache key uses segment path for proper cleanup
+                auto* cached_reader = 
dynamic_cast<CachedRemoteFileReader*>(reader.get());
                 ASSERT_NE(cached_reader, nullptr);
+                auto* merge_reader =
+                        
dynamic_cast<PackedFileReader*>(cached_reader->get_remote_reader());
+                ASSERT_NE(merge_reader, nullptr);
 
                 IOContext io_ctx;
                 size_t verified = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to