This is an automated email from the ASF dual-hosted git repository.

pengxiangyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a378a6024d [fix](cooldown)Support change be.conf: 
max_sub_cache_file_size (#17773)
a378a6024d is described below

commit a378a6024d21013e78c99ca50aa0c1d2f0554cc9
Author: pengxiangyu <diablo...@163.com>
AuthorDate: Wed Mar 15 12:19:12 2023 +0800

    [fix](cooldown)Support change be.conf: max_sub_cache_file_size (#17773)
    
    * delete files when sub file cache size is changed.
---
 be/src/io/cache/sub_file_cache.cpp | 63 ++++++++++++++++++++++++--------------
 be/src/io/cache/sub_file_cache.h   |  6 ++--
 2 files changed, 44 insertions(+), 25 deletions(-)

diff --git a/be/src/io/cache/sub_file_cache.cpp 
b/be/src/io/cache/sub_file_cache.cpp
index dbc7d1896d..4b21e37505 100644
--- a/be/src/io/cache/sub_file_cache.cpp
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -62,7 +62,7 @@ Status SubFileCache::read_at(size_t offset, Slice result, 
const IOContext& io_ct
 
 Status SubFileCache::read_at_impl(size_t offset, Slice result, const 
IOContext& io_ctx,
                                   size_t* bytes_read) {
-    _init();
+    RETURN_IF_ERROR(_init());
     if (io_ctx.reader_type != READER_QUERY) {
         return _remote_file_reader->read_at(offset, result, io_ctx, 
bytes_read);
     }
@@ -225,7 +225,7 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset, 
size_t req_size,
 }
 
 Status SubFileCache::clean_timeout_cache() {
-    _init();
+    RETURN_IF_ERROR(_init());
     SubGcQueue gc_queue;
     _gc_lru_queue.swap(gc_queue);
     std::vector<size_t> timeout_keys;
@@ -301,31 +301,48 @@ Status SubFileCache::_clean_cache_internal(size_t offset, 
size_t* cleaned_size)
     return _remove_cache_and_done(cache_file, done_file, cleaned_size);
 }
 
-void SubFileCache::_init() {
-    auto init = [this] {
-        std::vector<Path> cache_names;
+Status SubFileCache::_init() {
+    if (_is_inited) {
+        return Status::OK();
+    }
+    std::vector<Path> cache_names;
 
-        std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
-        if (!_get_dir_files_and_remove_unfinished(_cache_dir, 
cache_names).ok()) {
-            return;
-        }
-        for (const auto& file : cache_names) {
-            auto str_vec = split(file.native(), "_");
-            size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), 
nullptr, 10);
+    std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
+    size_t cache_file_size = 0;
+    RETURN_IF_ERROR(_get_dir_files_and_remove_unfinished(_cache_dir, 
cache_names));
+    std::map<int64_t, int64_t> expect_file_size_map;
+    RETURN_IF_ERROR(_get_all_sub_file_size(&expect_file_size_map));
+    for (const auto& file : cache_names) {
+        auto str_vec = split(file.native(), "_");
+        size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(), 
nullptr, 10);
 
-            size_t file_size = 0;
-            auto path = _cache_dir / file;
-            if (io::global_local_filesystem()->file_size(path, 
&file_size).ok()) {
-                _last_match_times[offset] = time(nullptr);
-                _cache_file_size += file_size;
-            } else {
-                LOG(WARNING) << "get local cache file size failed:" << 
path.native();
-                _clean_cache_internal(offset, nullptr);
-            }
+        size_t file_size = 0;
+        auto path = _cache_dir / file;
+        RETURN_IF_ERROR(io::global_local_filesystem()->file_size(path, 
&file_size));
+        if (expect_file_size_map.find(offset) == expect_file_size_map.end() ||
+            expect_file_size_map[offset] != file_size) {
+            LOG(INFO) << "Delete invalid cache file: " << path.native() << ", 
offset: " << offset
+                      << ", size: " << file_size;
+            _clean_cache_internal(offset, nullptr);
+            continue;
         }
-    };
+        _last_match_times[offset] = time(nullptr);
+        cache_file_size += file_size;
+    }
+    _cache_file_size = cache_file_size;
+    _is_inited = true;
+    return Status::OK();
+}
 
-    std::call_once(init_flag, init);
+Status SubFileCache::_get_all_sub_file_size(std::map<int64_t, int64_t>* 
expect_file_size_map) {
+    std::vector<size_t> cache_offsets;
+    RETURN_IF_ERROR(_get_need_cache_offsets(0, _remote_file_reader->size(), 
&cache_offsets));
+    for (int i = 0; i < cache_offsets.size() - 1; ++i) {
+        expect_file_size_map->emplace(cache_offsets[i], 
config::max_sub_cache_file_size);
+    }
+    expect_file_size_map->emplace(cache_offsets[cache_offsets.size() - 1],
+                                  _remote_file_reader->size() % 
config::max_sub_cache_file_size);
+    return Status::OK();
 }
 
 } // namespace io
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
index 6f5dfcd8f5..f8a1a48ba5 100644
--- a/be/src/io/cache/sub_file_cache.h
+++ b/be/src/io/cache/sub_file_cache.h
@@ -75,7 +75,9 @@ private:
 
     std::pair<Path, Path> _cache_path(size_t offset);
 
-    void _init();
+    Status _init();
+
+    Status _get_all_sub_file_size(std::map<int64_t, int64_t>* 
expect_file_size_map);
 
 private:
     struct SubFileInfo {
@@ -97,7 +99,7 @@ private:
     // offset_begin -> local file reader
     std::map<size_t, io::FileReaderSPtr> _cache_file_readers;
 
-    std::once_flag init_flag;
+    bool _is_inited = false;
 };
 
 } // namespace io


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to