This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 30c21789c8 [opt](filecache) use weak_ptr to cache the file handle of 
file segment (#21975)
30c21789c8 is described below

commit 30c21789c8f45831af0435a32af158d446473abc
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Jul 24 19:09:27 2023 +0800

    [opt](filecache) use weak_ptr to cache the file handle of file segment 
(#21975)
    
    Use weak_ptr to cache the file handle of file segment. The max cached 
number of file handles can be configured by 
`file_cache_max_file_reader_cache_size`, default `1000000`.
    Users can inspect the number of cached file handles by request BE metrics: 
`http://be_host:be_webserver_port/metrics`:
    ```
    # TYPE doris_be_file_cache_segment_reader_cache_size gauge
    
doris_be_file_cache_segment_reader_cache_size{path="/mnt/datadisk1/gaoxin/file_cache"}
 2500
    ```
---
 be/src/common/config.cpp                       |   1 +
 be/src/common/config.h                         |   2 +
 be/src/io/cache/block/block_file_cache.cpp     |  43 ++++++++
 be/src/io/cache/block/block_file_cache.h       |  25 +++++
 be/src/io/cache/block/block_file_segment.cpp   |  26 +++--
 be/src/io/cache/block/block_file_segment.h     |   6 +-
 be/src/io/cache/block/block_lru_file_cache.cpp |   3 +
 be/src/io/cache/block/block_lru_file_cache.h   |   1 +
 be/test/io/cache/file_block_cache_test.cpp     | 144 +++++++++++++++++++++++++
 9 files changed, 238 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c727202ce3..9a0b2a0e3a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1009,6 +1009,7 @@ DEFINE_mInt32(s3_write_buffer_size, "5242880");
 // can at most buffer 50MB data. And the num of multi part upload task is
 // s3_write_buffer_whole_size / s3_write_buffer_size
 DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
+DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
 
 //disable shrink memory by default
 DEFINE_Bool(enable_shrink_memory, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 05d7fcc658..169bbbac32 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1029,6 +1029,8 @@ DECLARE_mInt32(s3_write_buffer_size);
 // can at most buffer 50MB data. And the num of multi part upload task is
 // s3_write_buffer_whole_size / s3_write_buffer_size
 DECLARE_mInt32(s3_write_buffer_whole_size);
+// the max number of cached file handle for block segemnt
+DECLARE_mInt64(file_cache_max_file_reader_cache_size);
 //enable shrink memory
 DECLARE_Bool(enable_shrink_memory);
 // enable cache for high concurrent point query work load
diff --git a/be/src/io/cache/block/block_file_cache.cpp 
b/be/src/io/cache/block/block_file_cache.cpp
index 5c78c46851..0cd546aeaa 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -162,5 +162,48 @@ void IFileCache::QueryFileCacheContext::reserve(const Key& 
key, size_t offset, s
     }
 }
 
+void IFileCache::set_read_only(bool read_only) {
+    s_read_only = read_only;
+    if (read_only) {
+        std::lock_guard lock(s_file_reader_cache_mtx);
+        s_file_reader_cache.clear();
+        s_file_name_to_reader.clear();
+    }
+}
+
+std::weak_ptr<FileReader> IFileCache::cache_file_reader(const 
AccessKeyAndOffset& key,
+                                                        
std::shared_ptr<FileReader> file_reader) {
+    std::weak_ptr<FileReader> wp;
+    if (!s_read_only) [[likely]] {
+        std::lock_guard lock(s_file_reader_cache_mtx);
+        if (config::file_cache_max_file_reader_cache_size == 
s_file_reader_cache.size()) {
+            s_file_name_to_reader.erase(s_file_reader_cache.back().first);
+            s_file_reader_cache.pop_back();
+        }
+        wp = file_reader;
+        s_file_reader_cache.emplace_front(key, std::move(file_reader));
+        s_file_name_to_reader.insert(std::make_pair(key, 
s_file_reader_cache.begin()));
+    }
+    return wp;
+}
+
+void IFileCache::remove_file_reader(const AccessKeyAndOffset& key) {
+    std::lock_guard lock(s_file_reader_cache_mtx);
+    if (auto iter = s_file_name_to_reader.find(key); iter != 
s_file_name_to_reader.end()) {
+        s_file_reader_cache.erase(iter->second);
+        s_file_name_to_reader.erase(key);
+    }
+}
+
+bool IFileCache::contains_file_reader(const AccessKeyAndOffset& key) {
+    std::lock_guard lock(s_file_reader_cache_mtx);
+    return s_file_name_to_reader.find(key) != s_file_name_to_reader.end();
+}
+
+size_t IFileCache::file_reader_cache_size() {
+    std::lock_guard lock(s_file_reader_cache_mtx);
+    return s_file_name_to_reader.size();
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/cache/block/block_file_cache.h 
b/be/src/io/cache/block/block_file_cache.h
index 0f1cc347d5..5d0dd1f06f 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -37,6 +37,7 @@
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_fwd.h"
 #include "io/cache/block/block_file_cache_settings.h"
+#include "io/fs/file_reader.h"
 #include "io/io_common.h"
 #include "util/hash_util.hpp"
 #include "vec/common/uint128.h"
@@ -54,6 +55,7 @@ enum CacheType {
     NORMAL,
     DISPOSABLE,
 };
+
 struct CacheContext {
     CacheContext(const IOContext* io_ctx) {
         if (io_ctx->read_segment_index) {
@@ -292,6 +294,29 @@ public:
     };
     using QueryFileCacheContextHolderPtr = 
std::unique_ptr<QueryFileCacheContextHolder>;
     QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& 
query_id);
+
+private:
+    static inline std::list<std::pair<AccessKeyAndOffset, 
std::shared_ptr<FileReader>>>
+            s_file_reader_cache;
+    static inline std::unordered_map<AccessKeyAndOffset, 
decltype(s_file_reader_cache.begin()),
+                                     KeyAndOffsetHash>
+            s_file_name_to_reader;
+    static inline std::mutex s_file_reader_cache_mtx;
+    static inline std::atomic_bool s_read_only {false};
+
+public:
+    static void set_read_only(bool read_only);
+
+    static bool read_only() { return s_read_only; }
+
+    static std::weak_ptr<FileReader> cache_file_reader(const 
AccessKeyAndOffset& key,
+                                                       
std::shared_ptr<FileReader> file_reader);
+
+    static void remove_file_reader(const AccessKeyAndOffset& key);
+
+    // use for test
+    static bool contains_file_reader(const AccessKeyAndOffset& key);
+    static size_t file_reader_cache_size();
 };
 
 using CloudFileCachePtr = IFileCache*;
diff --git a/be/src/io/cache/block/block_file_segment.cpp 
b/be/src/io/cache/block/block_file_segment.cpp
index 130540f3e7..38d230d9bb 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -68,6 +68,13 @@ FileBlock::FileBlock(size_t offset_, size_t size_, const 
Key& key_, IFileCache*
     }
 }
 
+FileBlock::~FileBlock() {
+    std::shared_ptr<FileReader> reader;
+    if ((reader = _cache_reader.lock())) {
+        IFileCache::remove_file_reader(std::make_pair(_file_key, offset()));
+    }
+}
+
 FileBlock::State FileBlock::state() const {
     std::lock_guard segment_lock(_mutex);
     return _download_state;
@@ -171,21 +178,20 @@ std::string FileBlock::get_path_in_local_cache() const {
     return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
 }
 
-Status FileBlock::read_at(Slice buffer, size_t offset) {
+Status FileBlock::read_at(Slice buffer, size_t read_offset) {
     Status st = Status::OK();
-    if (!_cache_reader) {
-        std::lock_guard segment_lock(_mutex);
-        if (!_cache_reader) {
+    std::shared_ptr<FileReader> reader;
+    if (!(reader = _cache_reader.lock())) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        if (!(reader = _cache_reader.lock())) {
             auto download_path = get_path_in_local_cache();
-            st = global_local_filesystem()->open_file(download_path, 
&_cache_reader);
-            if (!st) {
-                _cache_reader.reset();
-                return st;
-            }
+            
RETURN_IF_ERROR(global_local_filesystem()->open_file(download_path, &reader));
+            _cache_reader =
+                    IFileCache::cache_file_reader(std::make_pair(_file_key, 
offset()), reader);
         }
     }
     size_t bytes_reads = buffer.size;
-    RETURN_IF_ERROR(_cache_reader->read_at(offset, buffer, &bytes_reads));
+    RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
     DCHECK(bytes_reads == buffer.size);
     return st;
 }
diff --git a/be/src/io/cache/block/block_file_segment.h 
b/be/src/io/cache/block/block_file_segment.h
index 6826a673f9..b462259931 100644
--- a/be/src/io/cache/block/block_file_segment.h
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -52,7 +52,7 @@ class FileBlock {
 public:
     using Key = IFileCache::Key;
     using LocalWriterPtr = std::unique_ptr<FileWriter>;
-    using LocalReaderPtr = std::shared_ptr<FileReader>;
+    using LocalReaderPtr = std::weak_ptr<FileReader>;
 
     enum class State {
         DOWNLOADED,
@@ -74,7 +74,7 @@ public:
     FileBlock(size_t offset, size_t size, const Key& key, IFileCache* cache, 
State download_state,
               CacheType cache_type);
 
-    ~FileBlock() = default;
+    ~FileBlock();
 
     State state() const;
 
@@ -110,7 +110,7 @@ public:
     Status append(Slice data);
 
     // read data from cache file
-    Status read_at(Slice buffer, size_t offset_);
+    Status read_at(Slice buffer, size_t read_offset);
 
     // finish write, release the file writer
     Status finalize_write();
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp 
b/be/src/io/cache/block/block_lru_file_cache.cpp
index af614560a9..62e9a3b31d 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -72,6 +72,7 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricU
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, 
MetricUnit::BYTES);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_segment_reader_cache_size, 
MetricUnit::NOUNIT);
 
 LRUFileCache::LRUFileCache(const std::string& cache_base_path,
                            const FileCacheSettings& cache_settings)
@@ -104,6 +105,7 @@ LRUFileCache::LRUFileCache(const std::string& 
cache_base_path,
     INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size);
     INT_UGAUGE_METRIC_REGISTER(_entity, 
file_cache_disposable_queue_max_elements);
     INT_UGAUGE_METRIC_REGISTER(_entity, 
file_cache_disposable_queue_curr_elements);
+    INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_segment_reader_cache_size);
 
     LOG(INFO) << fmt::format(
             "file cache path={}, disposable queue size={} elements={}, index 
queue size={} "
@@ -1116,6 +1118,7 @@ void LRUFileCache::update_cache_metrics() const {
     
file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l));
     
file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size());
     
file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l));
+    
file_cache_segment_reader_cache_size->set_value(IFileCache::file_reader_cache_size());
 }
 
 } // namespace io
diff --git a/be/src/io/cache/block/block_lru_file_cache.h 
b/be/src/io/cache/block/block_lru_file_cache.h
index ef9546bb5e..5a15b10ba2 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -221,6 +221,7 @@ private:
     UIntGauge* file_cache_disposable_queue_curr_size = nullptr;
     UIntGauge* file_cache_disposable_queue_max_elements = nullptr;
     UIntGauge* file_cache_disposable_queue_curr_elements = nullptr;
+    UIntGauge* file_cache_segment_reader_cache_size = nullptr;
 };
 
 } // namespace io
diff --git a/be/test/io/cache/file_block_cache_test.cpp 
b/be/test/io/cache/file_block_cache_test.cpp
index 1f5942ac17..1c60447035 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -816,4 +816,148 @@ TEST(LRUFileCache, query_limit_dcheck) {
     }
 }
 
+TEST(LRUFileCache, fd_cache_remove) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    doris::config::enable_file_cache_query_limit = true;
+    fs::create_directories(cache_base_path);
+    io::FileCacheSettings settings;
+    settings.index_queue_elements = 0;
+    settings.index_queue_size = 0;
+    settings.disposable_queue_size = 0;
+    settings.disposable_queue_elements = 0;
+    settings.query_queue_size = 15;
+    settings.query_queue_elements = 5;
+    settings.max_file_segment_size = 10;
+    settings.max_query_cache_size = 15;
+    settings.total_size = 15;
+    io::LRUFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    io::CacheContext context;
+    context.cache_type = io::CacheType::NORMAL;
+    auto key = io::LRUFileCache::hash("key1");
+    {
+        auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 
8]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(1, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(2, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
+        segments[0]->read_at(Slice(buffer.get(), 9), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
0)));
+    }
+    {
+        auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 
9]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(1, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(2, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
+        segments[0]->read_at(Slice(buffer.get(), 1), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
9)));
+    }
+    {
+        auto holder = cache.get_or_set(key, 10, 5, context); /// Add range 
[10, 14]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(3, segments[0], io::FileBlock::Range(10, 14), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(4, segments[0], io::FileBlock::Range(10, 14),
+                     io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
+        segments[0]->read_at(Slice(buffer.get(), 5), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
10)));
+    }
+    {
+        auto holder = cache.get_or_set(key, 15, 10, context); /// Add range 
[15, 24]
+        auto segments = fromHolder(holder);
+        ASSERT_EQ(segments.size(), 1);
+        assert_range(3, segments[0], io::FileBlock::Range(15, 24), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(4, segments[0], io::FileBlock::Range(15, 24),
+                     io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10);
+        segments[0]->read_at(Slice(buffer.get(), 10), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
15)));
+    }
+    EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
+    EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2);
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+TEST(LRUFileCache, fd_cache_evict) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    doris::config::enable_file_cache_query_limit = true;
+    fs::create_directories(cache_base_path);
+    io::FileCacheSettings settings;
+    settings.index_queue_elements = 0;
+    settings.index_queue_size = 0;
+    settings.disposable_queue_size = 0;
+    settings.disposable_queue_elements = 0;
+    settings.query_queue_size = 15;
+    settings.query_queue_elements = 5;
+    settings.max_file_segment_size = 10;
+    settings.max_query_cache_size = 15;
+    settings.total_size = 15;
+    io::LRUFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    io::CacheContext context;
+    context.cache_type = io::CacheType::NORMAL;
+    auto key = io::LRUFileCache::hash("key1");
+    config::file_cache_max_file_reader_cache_size = 2;
+    {
+        auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 
8]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(1, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(2, segments[0], io::FileBlock::Range(0, 8), 
io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
+        segments[0]->read_at(Slice(buffer.get(), 9), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
0)));
+    }
+    {
+        auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 
9]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(1, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(2, segments[0], io::FileBlock::Range(9, 9), 
io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
+        segments[0]->read_at(Slice(buffer.get(), 1), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
9)));
+    }
+    {
+        auto holder = cache.get_or_set(key, 10, 5, context); /// Add range 
[10, 14]
+        auto segments = fromHolder(holder);
+        ASSERT_GE(segments.size(), 1);
+        assert_range(3, segments[0], io::FileBlock::Range(10, 14), 
io::FileBlock::State::EMPTY);
+        ASSERT_TRUE(segments[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        assert_range(4, segments[0], io::FileBlock::Range(10, 14),
+                     io::FileBlock::State::DOWNLOADING);
+        download(segments[0]);
+        std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
+        segments[0]->read_at(Slice(buffer.get(), 5), 0);
+        EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 
10)));
+    }
+    EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
+    EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2);
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to