This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 453e3c18f4c [refactor](buffer) remove download buffer since it is no longer useful (#28832) 453e3c18f4c is described below commit 453e3c18f4c095fbcd99dc4d6f0be67a30a6b907 Author: zclllyybb <zhaochan...@selectdb.com> AuthorDate: Fri Dec 22 11:53:31 2023 +0800 [refactor](buffer) remove download buffer since it is no longer useful (#28832) remove download buffer since it is no longer useful --- be/src/common/config.cpp | 6 --- be/src/common/config.h | 6 --- be/src/runtime/exec_env.h | 26 +----------- be/src/runtime/exec_env_init.cpp | 53 +++++------------------- be/src/util/doris_metrics.h | 2 - docs/en/docs/admin-manual/config/be-config.md | 6 --- docs/zh-CN/docs/admin-manual/config/be-config.md | 6 --- 7 files changed, 12 insertions(+), 93 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1241ae39e67..03eaee7b23c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -792,12 +792,6 @@ DEFINE_Validator(max_send_batch_parallelism_per_job, DEFINE_Int32(send_batch_thread_pool_thread_num, "64"); // number of send batch thread pool queue size DEFINE_Int32(send_batch_thread_pool_queue_size, "102400"); -// number of download cache thread pool size -DEFINE_Int32(download_cache_thread_pool_thread_num, "48"); -// number of download cache thread pool queue size -DEFINE_Int32(download_cache_thread_pool_queue_size, "102400"); -// download cache buffer size -DEFINE_Int64(download_cache_buffer_size, "10485760"); // Limit the number of segment of a newly created rowset. // The newly created rowset may to be compacted after loading, diff --git a/be/src/common/config.h b/be/src/common/config.h index 856e2482745..e011073d44d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -851,12 +851,6 @@ DECLARE_mInt32(max_send_batch_parallelism_per_job); DECLARE_Int32(send_batch_thread_pool_thread_num); // number of send batch thread pool queue size DECLARE_Int32(send_batch_thread_pool_queue_size); -// number of download cache thread pool size -DECLARE_Int32(download_cache_thread_pool_thread_num); -// number of download cache thread pool queue size -DECLARE_Int32(download_cache_thread_pool_queue_size); -// download cache buffer size -DECLARE_Int64(download_cache_buffer_size); // Limit the number of segment of a newly created rowset. // The newly created rowset may to be compacted after loading, diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index ce534c7ce04..95c4e97e1b3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -18,10 +18,10 @@ #pragma once #include <common/multi_version.h> -#include <stddef.h> #include <algorithm> #include <atomic> +#include <cstddef> #include <map> #include <memory> #include <mutex> @@ -168,7 +168,6 @@ public: MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } - ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } ThreadPool* buffered_reader_prefetch_thread_pool() { return _buffered_reader_prefetch_thread_pool.get(); } @@ -177,23 +176,8 @@ public: ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); } - void set_serial_download_cache_thread_token() { - _serial_download_cache_thread_token = - download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1); - } - ThreadPoolToken* get_serial_download_cache_thread_token() { - return _serial_download_cache_thread_token.get(); - } - void init_download_cache_buf(); - void init_download_cache_required_components(); Status init_pipeline_task_scheduler(); void init_file_cache_factory(); - char* get_download_cache_buf(ThreadPoolToken* token) { - if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) { - return nullptr; - } - return _download_cache_buf_map[token].get(); - } io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; } UserFunctionCache* user_function_cache() { return _user_function_cache; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } @@ -322,23 +306,17 @@ private: std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker; std::unique_ptr<ThreadPool> _send_batch_thread_pool; - - // Threadpool used to download cache from remote storage - std::unique_ptr<ThreadPool> _download_cache_thread_pool; // Threadpool used to prefetch remote file for buffered reader std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool; // Threadpool used to upload local file to s3 std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool; - // A token used to submit download cache task serially - std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token; // Pool used by fragment manager to send profile or status to FE coordinator std::unique_ptr<ThreadPool> _send_report_thread_pool; // Pool used by join node to build hash table std::unique_ptr<ThreadPool> _join_node_thread_pool; // Pool to use a new thread to release object std::unique_ptr<ThreadPool> _lazy_release_obj_pool; - // ThreadPoolToken -> buffer - std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map; + FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _without_group_task_scheduler = nullptr; pipeline::TaskScheduler* _with_group_task_scheduler = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 13cb861aac2..4df69d67c03 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -17,14 +17,14 @@ // IWYU pragma: no_include <bthread/errno.h> #include <common/multi_version.h> -#include <errno.h> // IWYU pragma: keep #include <gen_cpp/HeartbeatService_types.h> #include <gen_cpp/Metrics_types.h> -#include <stdint.h> -#include <stdlib.h> -#include <string.h> #include <sys/resource.h> +#include <cerrno> // IWYU pragma: keep +#include <cstdint> +#include <cstdlib> +#include <cstring> #include <limits> #include <map> #include <memory> @@ -109,15 +109,13 @@ class PFunctionService_Stub; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT); static void init_doris_metrics(const std::vector<StorePath>& store_paths) { bool init_system_metrics = config::enable_system_metrics; std::set<std::string> disk_devices; std::vector<std::string> network_interfaces; std::vector<std::string> paths; - for (auto& store_path : store_paths) { + for (const auto& store_path : store_paths) { paths.emplace_back(store_path.path); } if (init_system_metrics) { @@ -167,8 +165,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, .set_max_queue_size(config::send_batch_thread_pool_queue_size) .build(&_send_batch_thread_pool)); - init_download_cache_required_components(); - static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") .set_min_threads(16) .set_max_threads(64) @@ -336,9 +332,11 @@ void ExecEnv::init_file_cache_factory() { continue; } - olap_res = file_cache_init_pool->submit_func(std::bind( - &io::FileCacheFactory::create_file_cache, _file_cache_factory, cache_path.path, - cache_path.init_settings(), &(cache_status.emplace_back()))); + olap_res = file_cache_init_pool->submit_func( + [this, capture0 = cache_path.path, capture1 = cache_path.init_settings(), + capture2 = &(cache_status.emplace_back())] { + _file_cache_factory->create_file_cache(capture0, capture1, capture2); + }); if (!olap_res.ok()) { LOG(FATAL) << "failed to init file cache, err: " << olap_res; @@ -355,7 +353,6 @@ void ExecEnv::init_file_cache_factory() { } } } - return; } Status ExecEnv::_init_mem_env() { @@ -488,43 +485,18 @@ void ExecEnv::init_mem_tracker() { std::make_shared<MemTracker>("IOBufBlockMemory", _orphan_mem_tracker_raw); } -void ExecEnv::init_download_cache_buf() { - std::unique_ptr<char[]> download_cache_buf(new char[config::download_cache_buffer_size]); - memset(download_cache_buf.get(), 0, config::download_cache_buffer_size); - _download_cache_buf_map[_serial_download_cache_thread_token.get()] = - std::move(download_cache_buf); -} - -void ExecEnv::init_download_cache_required_components() { - static_cast<void>(ThreadPoolBuilder("DownloadCacheThreadPool") - .set_min_threads(1) - .set_max_threads(config::download_cache_thread_pool_thread_num) - .set_max_queue_size(config::download_cache_thread_pool_queue_size) - .build(&_download_cache_thread_pool)); - set_serial_download_cache_thread_token(); - init_download_cache_buf(); -} - void ExecEnv::_register_metrics() { REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() { return _send_batch_thread_pool->num_threads(); }); REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() { return _send_batch_thread_pool->get_queue_size(); }); - - REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num, - [this]() { return _download_cache_thread_pool->num_threads(); }); - - REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size, - [this]() { return _download_cache_thread_pool->get_queue_size(); }); } void ExecEnv::_deregister_metrics() { DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size); } // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. @@ -572,8 +544,6 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_lazy_release_obj_pool); SAFE_SHUTDOWN(_send_report_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); - SAFE_SHUTDOWN(_serial_download_cache_thread_token); - SAFE_SHUTDOWN(_download_cache_thread_pool); // Free resource after threads are stopped. // Some threads are still running, like threads created by _new_load_stream_mgr ... @@ -645,9 +615,6 @@ void ExecEnv::destroy() { SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_user_function_cache); - _serial_download_cache_thread_token.reset(nullptr); - _download_cache_thread_pool.reset(nullptr); - // _heartbeat_flags must be destoried after staroge engine SAFE_DELETE(_heartbeat_flags); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index db52ceb405d..72ee81cd49b 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -194,8 +194,6 @@ public: UIntGauge* add_batch_task_queue_size = nullptr; UIntGauge* send_batch_thread_pool_thread_num = nullptr; UIntGauge* send_batch_thread_pool_queue_size = nullptr; - UIntGauge* download_cache_thread_pool_thread_num = nullptr; - UIntGauge* download_cache_thread_pool_queue_size = nullptr; UIntGauge* fragment_thread_pool_queue_size = nullptr; // Upload metrics diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 1dcf7dbb241..c55bc8d00b8 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -980,12 +980,6 @@ BaseCompaction:546859: * Description: Interval in milliseconds between memtable flush mgr refresh iterations * Default value: 100 -#### `download_cache_buffer_size` - -* Type: int64 -* Description: The size of the buffer used to receive data when downloading the cache. -* Default value: 10485760 - #### `zone_map_row_num_threshold` * Type: int32 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 802279c4cd0..567c3c598c5 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1005,12 +1005,6 @@ BaseCompaction:546859: * 描述:memtable主动下刷时刷新内存统计的周期(毫秒) * 默认值:100 -#### `download_cache_buffer_size` - -* 类型: int64 -* 描述: 下载缓存时用于接收数据的buffer的大小。 -* 默认值: 10485760 - #### `zone_map_row_num_threshold` * 类型: int32 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org