This is an automated email from the ASF dual-hosted git repository.

plat1ko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bb433c73257 [enhance](S3) Don't use aws's multithread utility in S3 FS 
to suite newer C++ compiler (#38539)
bb433c73257 is described below

commit bb433c73257bca314ce29f5cfa47d4a184fef0eb
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Wed Jul 31 13:23:28 2024 +0800

    [enhance](S3) Don't use aws's multithread utility in S3 FS to suite newer 
C++ compiler (#38539)
---
 be/src/common/config.cpp         |  3 ++-
 be/src/common/config.h           |  3 ++-
 be/src/io/fs/s3_file_system.cpp  | 23 ++++++++++-------------
 be/src/runtime/exec_env.h        |  2 ++
 be/src/runtime/exec_env_init.cpp |  6 ++++++
 5 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f984621ec85..d0a2a5fa7e3 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -938,7 +938,8 @@ DEFINE_mInt32(cold_data_compaction_interval_sec, "1800");
 
 DEFINE_String(tmp_file_dir, "tmp");
 
-DEFINE_Int32(s3_transfer_executor_pool_size, "2");
+DEFINE_Int32(min_s3_file_system_thread_num, "16");
+DEFINE_Int32(max_s3_file_system_thread_num, "64");
 
 DEFINE_Bool(enable_time_lut, "true");
 DEFINE_mBool(enable_simdjson_reader, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fcfce74e7be..e117c824329 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -995,7 +995,8 @@ DECLARE_mInt32(confirm_unused_remote_files_interval_sec);
 DECLARE_Int32(cold_data_compaction_thread_num);
 DECLARE_mInt32(cold_data_compaction_interval_sec);
 
-DECLARE_Int32(s3_transfer_executor_pool_size);
+DECLARE_Int32(min_s3_file_system_thread_num);
+DECLARE_Int32(max_s3_file_system_thread_num);
 
 DECLARE_Bool(enable_time_lut);
 DECLARE_mBool(enable_simdjson_reader);
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 93f36429485..3905c4ddb1e 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -18,9 +18,8 @@
 #include "io/fs/s3_file_system.h"
 
 #include <fmt/format.h>
-#include <stddef.h>
 
-#include <algorithm>
+#include <cstddef>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 // IWYU pragma: no_include <bits/chrono.h>
@@ -32,7 +31,6 @@
 #include <fstream> // IWYU pragma: keep
 #include <future>
 #include <memory>
-#include <sstream>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -46,7 +44,7 @@
 #include "io/fs/s3_file_reader.h"
 #include "io/fs/s3_file_writer.h"
 #include "io/fs/s3_obj_storage_client.h"
-#include "util/bvar_helper.h"
+#include "runtime/exec_env.h"
 #include "util/s3_uri.h"
 #include "util/s3_util.h"
 
@@ -69,13 +67,6 @@ Result<std::string> get_key(const Path& full_path) {
     return uri.get_key();
 }
 
-// TODO(plat1ko): AwsTransferManager will be deprecated
-std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>& 
default_executor() {
-    static auto executor = 
Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
-            "default", config::s3_transfer_executor_pool_size);
-    return executor;
-}
-
 } // namespace
 
 ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {}
@@ -383,13 +374,19 @@ Status S3FileSystem::batch_upload_impl(const 
std::vector<Path>& local_files,
         return Status::OK();
     };
 
+    Status s = Status::OK();
     std::vector<std::future<Status>> futures;
     for (int i = 0; i < local_files.size(); ++i) {
         auto task = std::make_shared<std::packaged_task<Status(size_t 
idx)>>(upload_task);
         futures.emplace_back(task->get_future());
-        default_executor()->Submit([t = std::move(task), idx = i]() mutable { 
(*t)(idx); });
+        auto st = 
ExecEnv::GetInstance()->s3_file_system_thread_pool()->submit_func(
+                [t = std::move(task), idx = i]() mutable { (*t)(idx); });
+        // We shouldn't return immediately since the previous submitted tasks 
might still be running in the thread pool
+        if (!st.ok()) {
+            s = st;
+            break;
+        }
     }
-    Status s = Status::OK();
     for (auto&& f : futures) {
         auto cur_s = f.get();
         if (!cur_s.ok()) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 89e5593c84b..65cf70bf568 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -204,6 +204,7 @@ public:
     ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); 
}
     ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); 
}
     ThreadPool* non_block_close_thread_pool();
+    ThreadPool* s3_file_system_thread_pool() { return 
_s3_file_system_thread_pool.get(); }
 
     Status init_pipeline_task_scheduler();
     void init_file_cache_factory();
@@ -381,6 +382,7 @@ private:
     // Pool to use a new thread to release object
     std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
     std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
+    std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
 
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6740f548761..dd8af20715f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -265,6 +265,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               
.set_min_threads(config::min_nonblock_close_thread_num)
                               
.set_max_threads(config::max_nonblock_close_thread_num)
                               .build(&_non_block_close_thread_pool));
+    static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool")
+                              
.set_min_threads(config::min_s3_file_system_thread_num)
+                              
.set_max_threads(config::max_s3_file_system_thread_num)
+                              .build(&_s3_file_system_thread_pool));
 
     // NOTE: runtime query statistics mgr could be visited by query and daemon 
thread
     // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
@@ -675,6 +679,7 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_join_node_thread_pool);
     SAFE_SHUTDOWN(_lazy_release_obj_pool);
     SAFE_SHUTDOWN(_non_block_close_thread_pool);
+    SAFE_SHUTDOWN(_s3_file_system_thread_pool);
     SAFE_SHUTDOWN(_send_report_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
 
@@ -720,6 +725,7 @@ void ExecEnv::destroy() {
     _join_node_thread_pool.reset(nullptr);
     _lazy_release_obj_pool.reset(nullptr);
     _non_block_close_thread_pool.reset(nullptr);
+    _s3_file_system_thread_pool.reset(nullptr);
     _send_report_thread_pool.reset(nullptr);
     _send_table_stats_thread_pool.reset(nullptr);
     _buffered_reader_prefetch_thread_pool.reset(nullptr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to