This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7a75c750278 [chore](file-cache) Enable file cache for cloud mode by force (#41357) (#41421) 7a75c750278 is described below commit 7a75c750278ee44889ab47c047c1f532bd9b6964 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Sun Sep 29 09:44:48 2024 +0800 [chore](file-cache) Enable file cache for cloud mode by force (#41357) (#41421) ## Proposed changes As title. pick:#41357 --- be/src/cloud/cloud_rowset_writer.cpp | 3 +- be/src/io/cache/block_file_cache_factory.h | 5 +- be/src/runtime/exec_env_init.cpp | 82 ++++++++++++++++-------------- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 7753bf7b65b..5f878f59d5c 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -17,6 +17,7 @@ #include "cloud/cloud_rowset_writer.h" +#include "common/status.h" #include "io/cache/block_file_cache_factory.h" #include "io/fs/file_system.h" #include "olap/rowset/rowset_factory.h" @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) if (_context.is_local_rowset()) { // In cloud mode, this branch implies it is an intermediate rowset for external merge sort, // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`). - _context.tablet_path = io::FileCacheFactory::instance()->get_cache_path(); + _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path(); } else { _rowset_meta->set_remote_storage_resource(*_context.storage_resource); } diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index 6365fab3105..d7b710876ce 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -21,7 +21,9 @@ #pragma once #include <memory> +#include <optional> #include <string> +#include <string_view> #include <vector> #include "common/status.h" @@ -46,7 +48,8 @@ public: size_t try_release(const std::string& base_path); - const std::string& get_cache_path() { + std::string_view pick_one_cache_path() { + DCHECK(!_caches.empty()); size_t cur_index = _next_index.fetch_add(1); return _caches[cur_index % _caches.size()]->get_base_path(); } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 37ac346198a..dbaa124a98c 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -269,7 +269,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, init_file_cache_factory(cache_paths); doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths, cache_paths); - _pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); @@ -391,50 +390,57 @@ Status ExecEnv::init_pipeline_task_scheduler() { void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) { // Load file cache before starting up daemon threads to make sure StorageEngine is read. - if (doris::config::enable_file_cache) { - if (config::file_cache_each_block_size > config::s3_write_buffer_size || - config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { - LOG_FATAL( - "The config file_cache_each_block_size {} must less than or equal to config " - "s3_write_buffer_size {} and config::s3_write_buffer_size % " - "config::file_cache_each_block_size must be zero", - config::file_cache_each_block_size, config::s3_write_buffer_size); - exit(-1); - } - std::unordered_set<std::string> cache_path_set; - Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); - if (!rest) { - LOG(FATAL) << "parse config file cache path failed, path=" - << doris::config::file_cache_path; + if (!config::enable_file_cache) { + if (config::is_cloud_mode()) { + LOG(FATAL) << "Cloud mode requires to enable file cache, plz set " + "config::enable_file_cache " + "= true"; exit(-1); } - std::vector<std::thread> file_cache_init_threads; + return; + } + if (config::file_cache_each_block_size > config::s3_write_buffer_size || + config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { + LOG_FATAL( + "The config file_cache_each_block_size {} must less than or equal to config " + "s3_write_buffer_size {} and config::s3_write_buffer_size % " + "config::file_cache_each_block_size must be zero", + config::file_cache_each_block_size, config::s3_write_buffer_size); + exit(-1); + } + std::unordered_set<std::string> cache_path_set; + Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); + if (!rest) { + LOG(FATAL) << "parse config file cache path failed, path=" + << doris::config::file_cache_path; + exit(-1); + } + std::vector<std::thread> file_cache_init_threads; - std::list<doris::Status> cache_status; - for (auto& cache_path : cache_paths) { - if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { - LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); - continue; - } + std::list<doris::Status> cache_status; + for (auto& cache_path : cache_paths) { + if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { + LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); + continue; + } - file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() { - *status = doris::io::FileCacheFactory::instance()->create_file_cache( - cache_path.path, cache_path.init_settings()); - }); + file_cache_init_threads.emplace_back([&, status = &cache_status.emplace_back()]() { + *status = doris::io::FileCacheFactory::instance()->create_file_cache( + cache_path.path, cache_path.init_settings()); + }); - cache_path_set.emplace(cache_path.path); - } + cache_path_set.emplace(cache_path.path); + } - for (std::thread& thread : file_cache_init_threads) { - if (thread.joinable()) { - thread.join(); - } + for (std::thread& thread : file_cache_init_threads) { + if (thread.joinable()) { + thread.join(); } - for (const auto& status : cache_status) { - if (!status.ok()) { - LOG(FATAL) << "failed to init file cache, err: " << status; - exit(-1); - } + } + for (const auto& status : cache_status) { + if (!status.ok()) { + LOG(FATAL) << "failed to init file cache, err: " << status; + exit(-1); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org