This is an automated email from the ASF dual-hosted git repository. pengxiangyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a378a6024d [fix](cooldown)Support change be.conf: max_sub_cache_file_size (#17773) a378a6024d is described below commit a378a6024d21013e78c99ca50aa0c1d2f0554cc9 Author: pengxiangyu <diablo...@163.com> AuthorDate: Wed Mar 15 12:19:12 2023 +0800 [fix](cooldown)Support change be.conf: max_sub_cache_file_size (#17773) * delete files when sub file cache size is changed. --- be/src/io/cache/sub_file_cache.cpp | 63 ++++++++++++++++++++++++-------------- be/src/io/cache/sub_file_cache.h | 6 ++-- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index dbc7d1896d..4b21e37505 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -62,7 +62,7 @@ Status SubFileCache::read_at(size_t offset, Slice result, const IOContext& io_ct Status SubFileCache::read_at_impl(size_t offset, Slice result, const IOContext& io_ctx, size_t* bytes_read) { - _init(); + RETURN_IF_ERROR(_init()); if (io_ctx.reader_type != READER_QUERY) { return _remote_file_reader->read_at(offset, result, io_ctx, bytes_read); } @@ -225,7 +225,7 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size, } Status SubFileCache::clean_timeout_cache() { - _init(); + RETURN_IF_ERROR(_init()); SubGcQueue gc_queue; _gc_lru_queue.swap(gc_queue); std::vector<size_t> timeout_keys; @@ -301,31 +301,48 @@ Status SubFileCache::_clean_cache_internal(size_t offset, size_t* cleaned_size) return _remove_cache_and_done(cache_file, done_file, cleaned_size); } -void SubFileCache::_init() { - auto init = [this] { - std::vector<Path> cache_names; +Status SubFileCache::_init() { + if (_is_inited) { + return Status::OK(); + } + std::vector<Path> cache_names; - std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); - if (!_get_dir_files_and_remove_unfinished(_cache_dir, cache_names).ok()) { - return; - } - for (const auto& file : cache_names) { - auto str_vec = split(file.native(), "_"); - size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), nullptr, 10); + std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); + size_t cache_file_size = 0; + RETURN_IF_ERROR(_get_dir_files_and_remove_unfinished(_cache_dir, cache_names)); + std::map<int64_t, int64_t> expect_file_size_map; + RETURN_IF_ERROR(_get_all_sub_file_size(&expect_file_size_map)); + for (const auto& file : cache_names) { + auto str_vec = split(file.native(), "_"); + size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), nullptr, 10); - size_t file_size = 0; - auto path = _cache_dir / file; - if (io::global_local_filesystem()->file_size(path, &file_size).ok()) { - _last_match_times[offset] = time(nullptr); - _cache_file_size += file_size; - } else { - LOG(WARNING) << "get local cache file size failed:" << path.native(); - _clean_cache_internal(offset, nullptr); - } + size_t file_size = 0; + auto path = _cache_dir / file; + RETURN_IF_ERROR(io::global_local_filesystem()->file_size(path, &file_size)); + if (expect_file_size_map.find(offset) == expect_file_size_map.end() || + expect_file_size_map[offset] != file_size) { + LOG(INFO) << "Delete invalid cache file: " << path.native() << ", offset: " << offset + << ", size: " << file_size; + _clean_cache_internal(offset, nullptr); + continue; } - }; + _last_match_times[offset] = time(nullptr); + cache_file_size += file_size; + } + _cache_file_size = cache_file_size; + _is_inited = true; + return Status::OK(); +} - std::call_once(init_flag, init); +Status SubFileCache::_get_all_sub_file_size(std::map<int64_t, int64_t>* expect_file_size_map) { + std::vector<size_t> cache_offsets; + RETURN_IF_ERROR(_get_need_cache_offsets(0, _remote_file_reader->size(), &cache_offsets)); + for (int i = 0; i < cache_offsets.size() - 1; ++i) { + expect_file_size_map->emplace(cache_offsets[i], config::max_sub_cache_file_size); + } + expect_file_size_map->emplace(cache_offsets[cache_offsets.size() - 1], + _remote_file_reader->size() % config::max_sub_cache_file_size); + return Status::OK(); } } // namespace io diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h index 6f5dfcd8f5..f8a1a48ba5 100644 --- a/be/src/io/cache/sub_file_cache.h +++ b/be/src/io/cache/sub_file_cache.h @@ -75,7 +75,9 @@ private: std::pair<Path, Path> _cache_path(size_t offset); - void _init(); + Status _init(); + + Status _get_all_sub_file_size(std::map<int64_t, int64_t>* expect_file_size_map); private: struct SubFileInfo { @@ -97,7 +99,7 @@ private: // offset_begin -> local file reader std::map<size_t, io::FileReaderSPtr> _cache_file_readers; - std::once_flag init_flag; + bool _is_inited = false; }; } // namespace io --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org