This is an automated email from the ASF dual-hosted git repository. plat1ko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bb433c73257 [enhance](S3) Don't use aws's multithread utility in S3 FS to suite newer C++ compiler (#38539) bb433c73257 is described below commit bb433c73257bca314ce29f5cfa47d4a184fef0eb Author: AlexYue <yj976240...@gmail.com> AuthorDate: Wed Jul 31 13:23:28 2024 +0800 [enhance](S3) Don't use aws's multithread utility in S3 FS to suite newer C++ compiler (#38539) --- be/src/common/config.cpp | 3 ++- be/src/common/config.h | 3 ++- be/src/io/fs/s3_file_system.cpp | 23 ++++++++++------------- be/src/runtime/exec_env.h | 2 ++ be/src/runtime/exec_env_init.cpp | 6 ++++++ 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f984621ec85..d0a2a5fa7e3 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -938,7 +938,8 @@ DEFINE_mInt32(cold_data_compaction_interval_sec, "1800"); DEFINE_String(tmp_file_dir, "tmp"); -DEFINE_Int32(s3_transfer_executor_pool_size, "2"); +DEFINE_Int32(min_s3_file_system_thread_num, "16"); +DEFINE_Int32(max_s3_file_system_thread_num, "64"); DEFINE_Bool(enable_time_lut, "true"); DEFINE_mBool(enable_simdjson_reader, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index fcfce74e7be..e117c824329 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -995,7 +995,8 @@ DECLARE_mInt32(confirm_unused_remote_files_interval_sec); DECLARE_Int32(cold_data_compaction_thread_num); DECLARE_mInt32(cold_data_compaction_interval_sec); -DECLARE_Int32(s3_transfer_executor_pool_size); +DECLARE_Int32(min_s3_file_system_thread_num); +DECLARE_Int32(max_s3_file_system_thread_num); DECLARE_Bool(enable_time_lut); DECLARE_mBool(enable_simdjson_reader); diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 93f36429485..3905c4ddb1e 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -18,9 +18,8 @@ #include "io/fs/s3_file_system.h" #include <fmt/format.h> -#include <stddef.h> -#include <algorithm> +#include <cstddef> #include "common/compiler_util.h" // IWYU pragma: keep // IWYU pragma: no_include <bits/chrono.h> @@ -32,7 +31,6 @@ #include <fstream> // IWYU pragma: keep #include <future> #include <memory> -#include <sstream> #include "common/config.h" #include "common/logging.h" @@ -46,7 +44,7 @@ #include "io/fs/s3_file_reader.h" #include "io/fs/s3_file_writer.h" #include "io/fs/s3_obj_storage_client.h" -#include "util/bvar_helper.h" +#include "runtime/exec_env.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -69,13 +67,6 @@ Result<std::string> get_key(const Path& full_path) { return uri.get_key(); } -// TODO(plat1ko): AwsTransferManager will be deprecated -std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>& default_executor() { - static auto executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>( - "default", config::s3_transfer_executor_pool_size); - return executor; -} - } // namespace ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {} @@ -383,13 +374,19 @@ Status S3FileSystem::batch_upload_impl(const std::vector<Path>& local_files, return Status::OK(); }; + Status s = Status::OK(); std::vector<std::future<Status>> futures; for (int i = 0; i < local_files.size(); ++i) { auto task = std::make_shared<std::packaged_task<Status(size_t idx)>>(upload_task); futures.emplace_back(task->get_future()); - default_executor()->Submit([t = std::move(task), idx = i]() mutable { (*t)(idx); }); + auto st = ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func( + [t = std::move(task), idx = i]() mutable { (*t)(idx); }); + // We shouldn't return immediately since the previous submitted tasks might still be running in the thread pool + if (!st.ok()) { + s = st; + break; + } } - Status s = Status::OK(); for (auto&& f : futures) { auto cur_s = f.get(); if (!cur_s.ok()) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 89e5593c84b..65cf70bf568 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -204,6 +204,7 @@ public: ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); } ThreadPool* non_block_close_thread_pool(); + ThreadPool* s3_file_system_thread_pool() { return _s3_file_system_thread_pool.get(); } Status init_pipeline_task_scheduler(); void init_file_cache_factory(); @@ -381,6 +382,7 @@ private: // Pool to use a new thread to release object std::unique_ptr<ThreadPool> _lazy_release_obj_pool; std::unique_ptr<ThreadPool> _non_block_close_thread_pool; + std::unique_ptr<ThreadPool> _s3_file_system_thread_pool; FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _without_group_task_scheduler = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 6740f548761..dd8af20715f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -265,6 +265,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, .set_min_threads(config::min_nonblock_close_thread_num) .set_max_threads(config::max_nonblock_close_thread_num) .build(&_non_block_close_thread_pool)); + static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool") + .set_min_threads(config::min_s3_file_system_thread_num) + .set_max_threads(config::max_s3_file_system_thread_num) + .build(&_s3_file_system_thread_pool)); // NOTE: runtime query statistics mgr could be visited by query and daemon thread // so it should be created before all query begin and deleted after all query and daemon thread stoppped @@ -675,6 +679,7 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_join_node_thread_pool); SAFE_SHUTDOWN(_lazy_release_obj_pool); SAFE_SHUTDOWN(_non_block_close_thread_pool); + SAFE_SHUTDOWN(_s3_file_system_thread_pool); SAFE_SHUTDOWN(_send_report_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); @@ -720,6 +725,7 @@ void ExecEnv::destroy() { _join_node_thread_pool.reset(nullptr); _lazy_release_obj_pool.reset(nullptr); _non_block_close_thread_pool.reset(nullptr); + _s3_file_system_thread_pool.reset(nullptr); _send_report_thread_pool.reset(nullptr); _send_table_stats_thread_pool.reset(nullptr); _buffered_reader_prefetch_thread_pool.reset(nullptr); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org