This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 30c21789c8 [opt](filecache) use weak_ptr to cache the file handle of file segment (#21975) 30c21789c8 is described below commit 30c21789c8f45831af0435a32af158d446473abc Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Jul 24 19:09:27 2023 +0800 [opt](filecache) use weak_ptr to cache the file handle of file segment (#21975) Use weak_ptr to cache the file handle of file segment. The max cached number of file handles can be configured by `file_cache_max_file_reader_cache_size`, default `1000000`. Users can inspect the number of cached file handles by request BE metrics: `http://be_host:be_webserver_port/metrics`: ``` # TYPE doris_be_file_cache_segment_reader_cache_size gauge doris_be_file_cache_segment_reader_cache_size{path="/mnt/datadisk1/gaoxin/file_cache"} 2500 ``` --- be/src/common/config.cpp | 1 + be/src/common/config.h | 2 + be/src/io/cache/block/block_file_cache.cpp | 43 ++++++++ be/src/io/cache/block/block_file_cache.h | 25 +++++ be/src/io/cache/block/block_file_segment.cpp | 26 +++-- be/src/io/cache/block/block_file_segment.h | 6 +- be/src/io/cache/block/block_lru_file_cache.cpp | 3 + be/src/io/cache/block/block_lru_file_cache.h | 1 + be/test/io/cache/file_block_cache_test.cpp | 144 +++++++++++++++++++++++++ 9 files changed, 238 insertions(+), 13 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c727202ce3..9a0b2a0e3a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1009,6 +1009,7 @@ DEFINE_mInt32(s3_write_buffer_size, "5242880"); // can at most buffer 50MB data. And the num of multi part upload task is // s3_write_buffer_whole_size / s3_write_buffer_size DEFINE_mInt32(s3_write_buffer_whole_size, "524288000"); +DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); //disable shrink memory by default DEFINE_Bool(enable_shrink_memory, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 05d7fcc658..169bbbac32 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1029,6 +1029,8 @@ DECLARE_mInt32(s3_write_buffer_size); // can at most buffer 50MB data. And the num of multi part upload task is // s3_write_buffer_whole_size / s3_write_buffer_size DECLARE_mInt32(s3_write_buffer_whole_size); +// the max number of cached file handle for block segemnt +DECLARE_mInt64(file_cache_max_file_reader_cache_size); //enable shrink memory DECLARE_Bool(enable_shrink_memory); // enable cache for high concurrent point query work load diff --git a/be/src/io/cache/block/block_file_cache.cpp b/be/src/io/cache/block/block_file_cache.cpp index 5c78c46851..0cd546aeaa 100644 --- a/be/src/io/cache/block/block_file_cache.cpp +++ b/be/src/io/cache/block/block_file_cache.cpp @@ -162,5 +162,48 @@ void IFileCache::QueryFileCacheContext::reserve(const Key& key, size_t offset, s } } +void IFileCache::set_read_only(bool read_only) { + s_read_only = read_only; + if (read_only) { + std::lock_guard lock(s_file_reader_cache_mtx); + s_file_reader_cache.clear(); + s_file_name_to_reader.clear(); + } +} + +std::weak_ptr<FileReader> IFileCache::cache_file_reader(const AccessKeyAndOffset& key, + std::shared_ptr<FileReader> file_reader) { + std::weak_ptr<FileReader> wp; + if (!s_read_only) [[likely]] { + std::lock_guard lock(s_file_reader_cache_mtx); + if (config::file_cache_max_file_reader_cache_size == s_file_reader_cache.size()) { + s_file_name_to_reader.erase(s_file_reader_cache.back().first); + s_file_reader_cache.pop_back(); + } + wp = file_reader; + s_file_reader_cache.emplace_front(key, std::move(file_reader)); + s_file_name_to_reader.insert(std::make_pair(key, s_file_reader_cache.begin())); + } + return wp; +} + +void IFileCache::remove_file_reader(const AccessKeyAndOffset& key) { + std::lock_guard lock(s_file_reader_cache_mtx); + if (auto iter = s_file_name_to_reader.find(key); iter != s_file_name_to_reader.end()) { + s_file_reader_cache.erase(iter->second); + s_file_name_to_reader.erase(key); + } +} + +bool IFileCache::contains_file_reader(const AccessKeyAndOffset& key) { + std::lock_guard lock(s_file_reader_cache_mtx); + return s_file_name_to_reader.find(key) != s_file_name_to_reader.end(); +} + +size_t IFileCache::file_reader_cache_size() { + std::lock_guard lock(s_file_reader_cache_mtx); + return s_file_name_to_reader.size(); +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h index 0f1cc347d5..5d0dd1f06f 100644 --- a/be/src/io/cache/block/block_file_cache.h +++ b/be/src/io/cache/block/block_file_cache.h @@ -37,6 +37,7 @@ #include "common/status.h" #include "io/cache/block/block_file_cache_fwd.h" #include "io/cache/block/block_file_cache_settings.h" +#include "io/fs/file_reader.h" #include "io/io_common.h" #include "util/hash_util.hpp" #include "vec/common/uint128.h" @@ -54,6 +55,7 @@ enum CacheType { NORMAL, DISPOSABLE, }; + struct CacheContext { CacheContext(const IOContext* io_ctx) { if (io_ctx->read_segment_index) { @@ -292,6 +294,29 @@ public: }; using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>; QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id); + +private: + static inline std::list<std::pair<AccessKeyAndOffset, std::shared_ptr<FileReader>>> + s_file_reader_cache; + static inline std::unordered_map<AccessKeyAndOffset, decltype(s_file_reader_cache.begin()), + KeyAndOffsetHash> + s_file_name_to_reader; + static inline std::mutex s_file_reader_cache_mtx; + static inline std::atomic_bool s_read_only {false}; + +public: + static void set_read_only(bool read_only); + + static bool read_only() { return s_read_only; } + + static std::weak_ptr<FileReader> cache_file_reader(const AccessKeyAndOffset& key, + std::shared_ptr<FileReader> file_reader); + + static void remove_file_reader(const AccessKeyAndOffset& key); + + // use for test + static bool contains_file_reader(const AccessKeyAndOffset& key); + static size_t file_reader_cache_size(); }; using CloudFileCachePtr = IFileCache*; diff --git a/be/src/io/cache/block/block_file_segment.cpp b/be/src/io/cache/block/block_file_segment.cpp index 130540f3e7..38d230d9bb 100644 --- a/be/src/io/cache/block/block_file_segment.cpp +++ b/be/src/io/cache/block/block_file_segment.cpp @@ -68,6 +68,13 @@ FileBlock::FileBlock(size_t offset_, size_t size_, const Key& key_, IFileCache* } } +FileBlock::~FileBlock() { + std::shared_ptr<FileReader> reader; + if ((reader = _cache_reader.lock())) { + IFileCache::remove_file_reader(std::make_pair(_file_key, offset())); + } +} + FileBlock::State FileBlock::state() const { std::lock_guard segment_lock(_mutex); return _download_state; @@ -171,21 +178,20 @@ std::string FileBlock::get_path_in_local_cache() const { return _cache->get_path_in_local_cache(key(), offset(), _cache_type); } -Status FileBlock::read_at(Slice buffer, size_t offset) { +Status FileBlock::read_at(Slice buffer, size_t read_offset) { Status st = Status::OK(); - if (!_cache_reader) { - std::lock_guard segment_lock(_mutex); - if (!_cache_reader) { + std::shared_ptr<FileReader> reader; + if (!(reader = _cache_reader.lock())) { + std::lock_guard<std::mutex> lock(_mutex); + if (!(reader = _cache_reader.lock())) { auto download_path = get_path_in_local_cache(); - st = global_local_filesystem()->open_file(download_path, &_cache_reader); - if (!st) { - _cache_reader.reset(); - return st; - } + RETURN_IF_ERROR(global_local_filesystem()->open_file(download_path, &reader)); + _cache_reader = + IFileCache::cache_file_reader(std::make_pair(_file_key, offset()), reader); } } size_t bytes_reads = buffer.size; - RETURN_IF_ERROR(_cache_reader->read_at(offset, buffer, &bytes_reads)); + RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads)); DCHECK(bytes_reads == buffer.size); return st; } diff --git a/be/src/io/cache/block/block_file_segment.h b/be/src/io/cache/block/block_file_segment.h index 6826a673f9..b462259931 100644 --- a/be/src/io/cache/block/block_file_segment.h +++ b/be/src/io/cache/block/block_file_segment.h @@ -52,7 +52,7 @@ class FileBlock { public: using Key = IFileCache::Key; using LocalWriterPtr = std::unique_ptr<FileWriter>; - using LocalReaderPtr = std::shared_ptr<FileReader>; + using LocalReaderPtr = std::weak_ptr<FileReader>; enum class State { DOWNLOADED, @@ -74,7 +74,7 @@ public: FileBlock(size_t offset, size_t size, const Key& key, IFileCache* cache, State download_state, CacheType cache_type); - ~FileBlock() = default; + ~FileBlock(); State state() const; @@ -110,7 +110,7 @@ public: Status append(Slice data); // read data from cache file - Status read_at(Slice buffer, size_t offset_); + Status read_at(Slice buffer, size_t read_offset); // finish write, release the file writer Status finalize_write(); diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index af614560a9..62e9a3b31d 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -72,6 +72,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricU DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_segment_reader_cache_size, MetricUnit::NOUNIT); LRUFileCache::LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) @@ -104,6 +105,7 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size); INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_elements); INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_segment_reader_cache_size); LOG(INFO) << fmt::format( "file cache path={}, disposable queue size={} elements={}, index queue size={} " @@ -1116,6 +1118,7 @@ void LRUFileCache::update_cache_metrics() const { file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l)); file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size()); file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l)); + file_cache_segment_reader_cache_size->set_value(IFileCache::file_reader_cache_size()); } } // namespace io diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index ef9546bb5e..5a15b10ba2 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -221,6 +221,7 @@ private: UIntGauge* file_cache_disposable_queue_curr_size = nullptr; UIntGauge* file_cache_disposable_queue_max_elements = nullptr; UIntGauge* file_cache_disposable_queue_curr_elements = nullptr; + UIntGauge* file_cache_segment_reader_cache_size = nullptr; }; } // namespace io diff --git a/be/test/io/cache/file_block_cache_test.cpp b/be/test/io/cache/file_block_cache_test.cpp index 1f5942ac17..1c60447035 100644 --- a/be/test/io/cache/file_block_cache_test.cpp +++ b/be/test/io/cache/file_block_cache_test.cpp @@ -816,4 +816,148 @@ TEST(LRUFileCache, query_limit_dcheck) { } } +TEST(LRUFileCache, fd_cache_remove) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + doris::config::enable_file_cache_query_limit = true; + fs::create_directories(cache_base_path); + io::FileCacheSettings settings; + settings.index_queue_elements = 0; + settings.index_queue_size = 0; + settings.disposable_queue_size = 0; + settings.disposable_queue_elements = 0; + settings.query_queue_size = 15; + settings.query_queue_elements = 5; + settings.max_file_segment_size = 10; + settings.max_query_cache_size = 15; + settings.total_size = 15; + io::LRUFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + io::CacheContext context; + context.cache_type = io::CacheType::NORMAL; + auto key = io::LRUFileCache::hash("key1"); + { + auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9); + segments[0]->read_at(Slice(buffer.get(), 9), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); + } + { + auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 9] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1); + segments[0]->read_at(Slice(buffer.get(), 1), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9))); + } + { + auto holder = cache.get_or_set(key, 10, 5, context); /// Add range [10, 14] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(3, segments[0], io::FileBlock::Range(10, 14), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(4, segments[0], io::FileBlock::Range(10, 14), + io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5); + segments[0]->read_at(Slice(buffer.get(), 5), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10))); + } + { + auto holder = cache.get_or_set(key, 15, 10, context); /// Add range [15, 24] + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(3, segments[0], io::FileBlock::Range(15, 24), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(4, segments[0], io::FileBlock::Range(15, 24), + io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10); + segments[0]->read_at(Slice(buffer.get(), 10), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 15))); + } + EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); + EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST(LRUFileCache, fd_cache_evict) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + doris::config::enable_file_cache_query_limit = true; + fs::create_directories(cache_base_path); + io::FileCacheSettings settings; + settings.index_queue_elements = 0; + settings.index_queue_size = 0; + settings.disposable_queue_size = 0; + settings.disposable_queue_elements = 0; + settings.query_queue_size = 15; + settings.query_queue_elements = 5; + settings.max_file_segment_size = 10; + settings.max_query_cache_size = 15; + settings.total_size = 15; + io::LRUFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + io::CacheContext context; + context.cache_type = io::CacheType::NORMAL; + auto key = io::LRUFileCache::hash("key1"); + config::file_cache_max_file_reader_cache_size = 2; + { + auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0, 8] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9); + segments[0]->read_at(Slice(buffer.get(), 9), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); + } + { + auto holder = cache.get_or_set(key, 9, 1, context); /// Add range [9, 9] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1); + segments[0]->read_at(Slice(buffer.get(), 1), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9))); + } + { + auto holder = cache.get_or_set(key, 10, 5, context); /// Add range [10, 14] + auto segments = fromHolder(holder); + ASSERT_GE(segments.size(), 1); + assert_range(3, segments[0], io::FileBlock::Range(10, 14), io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + assert_range(4, segments[0], io::FileBlock::Range(10, 14), + io::FileBlock::State::DOWNLOADING); + download(segments[0]); + std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5); + segments[0]->read_at(Slice(buffer.get(), 5), 0); + EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10))); + } + EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); + EXPECT_EQ(io::IFileCache::file_reader_cache_size(), 2); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + } // namespace doris::io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org