This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7a75c750278 [chore](file-cache) Enable file cache for cloud mode by 
force (#41357) (#41421)
7a75c750278 is described below

commit 7a75c750278ee44889ab47c047c1f532bd9b6964
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Sun Sep 29 09:44:48 2024 +0800

    [chore](file-cache) Enable file cache for cloud mode by force (#41357) 
(#41421)
    
    ## Proposed changes
    
    As title.
    
    pick:#41357
---
 be/src/cloud/cloud_rowset_writer.cpp       |  3 +-
 be/src/io/cache/block_file_cache_factory.h |  5 +-
 be/src/runtime/exec_env_init.cpp           | 82 ++++++++++++++++--------------
 3 files changed, 50 insertions(+), 40 deletions(-)

diff --git a/be/src/cloud/cloud_rowset_writer.cpp 
b/be/src/cloud/cloud_rowset_writer.cpp
index 7753bf7b65b..5f878f59d5c 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "cloud/cloud_rowset_writer.h"
 
+#include "common/status.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/fs/file_system.h"
 #include "olap/rowset/rowset_factory.h"
@@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context)
     if (_context.is_local_rowset()) {
         // In cloud mode, this branch implies it is an intermediate rowset for 
external merge sort,
         // we use `global_local_filesystem` to write data to 
`tmp_file_dir`(see `local_segment_path`).
-        _context.tablet_path = 
io::FileCacheFactory::instance()->get_cache_path();
+        _context.tablet_path = 
io::FileCacheFactory::instance()->pick_one_cache_path();
     } else {
         _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
     }
diff --git a/be/src/io/cache/block_file_cache_factory.h 
b/be/src/io/cache/block_file_cache_factory.h
index 6365fab3105..d7b710876ce 100644
--- a/be/src/io/cache/block_file_cache_factory.h
+++ b/be/src/io/cache/block_file_cache_factory.h
@@ -21,7 +21,9 @@
 #pragma once
 
 #include <memory>
+#include <optional>
 #include <string>
+#include <string_view>
 #include <vector>
 
 #include "common/status.h"
@@ -46,7 +48,8 @@ public:
 
     size_t try_release(const std::string& base_path);
 
-    const std::string& get_cache_path() {
+    std::string_view pick_one_cache_path() {
+        DCHECK(!_caches.empty());
         size_t cur_index = _next_index.fetch_add(1);
         return _caches[cur_index % _caches.size()]->get_base_path();
     }
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 37ac346198a..dbaa124a98c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -269,7 +269,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     init_file_cache_factory(cache_paths);
     doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, 
spill_store_paths,
                                                           cache_paths);
-
     _pipeline_tracer_ctx = 
std::make_unique<pipeline::PipelineTracerContext>(); // before query
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _workload_group_manager = new WorkloadGroupMgr();
@@ -391,50 +390,57 @@ Status ExecEnv::init_pipeline_task_scheduler() {
 
 void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& 
cache_paths) {
     // Load file cache before starting up daemon threads to make sure 
StorageEngine is read.
-    if (doris::config::enable_file_cache) {
-        if (config::file_cache_each_block_size > config::s3_write_buffer_size 
||
-            config::s3_write_buffer_size % config::file_cache_each_block_size 
!= 0) {
-            LOG_FATAL(
-                    "The config file_cache_each_block_size {} must less than 
or equal to config "
-                    "s3_write_buffer_size {} and config::s3_write_buffer_size 
% "
-                    "config::file_cache_each_block_size must be zero",
-                    config::file_cache_each_block_size, 
config::s3_write_buffer_size);
-            exit(-1);
-        }
-        std::unordered_set<std::string> cache_path_set;
-        Status rest = 
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
-        if (!rest) {
-            LOG(FATAL) << "parse config file cache path failed, path="
-                       << doris::config::file_cache_path;
+    if (!config::enable_file_cache) {
+        if (config::is_cloud_mode()) {
+            LOG(FATAL) << "Cloud mode requires to enable file cache, plz set "
+                          "config::enable_file_cache "
+                          "= true";
             exit(-1);
         }
-        std::vector<std::thread> file_cache_init_threads;
+        return;
+    }
+    if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
+        config::s3_write_buffer_size % config::file_cache_each_block_size != 
0) {
+        LOG_FATAL(
+                "The config file_cache_each_block_size {} must less than or 
equal to config "
+                "s3_write_buffer_size {} and config::s3_write_buffer_size % "
+                "config::file_cache_each_block_size must be zero",
+                config::file_cache_each_block_size, 
config::s3_write_buffer_size);
+        exit(-1);
+    }
+    std::unordered_set<std::string> cache_path_set;
+    Status rest = 
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
+    if (!rest) {
+        LOG(FATAL) << "parse config file cache path failed, path="
+                   << doris::config::file_cache_path;
+        exit(-1);
+    }
+    std::vector<std::thread> file_cache_init_threads;
 
-        std::list<doris::Status> cache_status;
-        for (auto& cache_path : cache_paths) {
-            if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
-                LOG(WARNING) << fmt::format("cache path {} is duplicate", 
cache_path.path);
-                continue;
-            }
+    std::list<doris::Status> cache_status;
+    for (auto& cache_path : cache_paths) {
+        if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
+            LOG(WARNING) << fmt::format("cache path {} is duplicate", 
cache_path.path);
+            continue;
+        }
 
-            file_cache_init_threads.emplace_back([&, status = 
&cache_status.emplace_back()]() {
-                *status = 
doris::io::FileCacheFactory::instance()->create_file_cache(
-                        cache_path.path, cache_path.init_settings());
-            });
+        file_cache_init_threads.emplace_back([&, status = 
&cache_status.emplace_back()]() {
+            *status = 
doris::io::FileCacheFactory::instance()->create_file_cache(
+                    cache_path.path, cache_path.init_settings());
+        });
 
-            cache_path_set.emplace(cache_path.path);
-        }
+        cache_path_set.emplace(cache_path.path);
+    }
 
-        for (std::thread& thread : file_cache_init_threads) {
-            if (thread.joinable()) {
-                thread.join();
-            }
+    for (std::thread& thread : file_cache_init_threads) {
+        if (thread.joinable()) {
+            thread.join();
         }
-        for (const auto& status : cache_status) {
-            if (!status.ok()) {
-                LOG(FATAL) << "failed to init file cache, err: " << status;
-                exit(-1);
-            }
+    }
+    for (const auto& status : cache_status) {
+        if (!status.ok()) {
+            LOG(FATAL) << "failed to init file cache, err: " << status;
+            exit(-1);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to