This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 453e3c18f4c [refactor](buffer) remove download buffer since it is no 
longer useful (#28832)
453e3c18f4c is described below

commit 453e3c18f4c095fbcd99dc4d6f0be67a30a6b907
Author: zclllyybb <zhaochan...@selectdb.com>
AuthorDate: Fri Dec 22 11:53:31 2023 +0800

    [refactor](buffer) remove download buffer since it is no longer useful 
(#28832)
    
    remove download buffer since it is no longer useful
---
 be/src/common/config.cpp                         |  6 ---
 be/src/common/config.h                           |  6 ---
 be/src/runtime/exec_env.h                        | 26 +-----------
 be/src/runtime/exec_env_init.cpp                 | 53 +++++-------------------
 be/src/util/doris_metrics.h                      |  2 -
 docs/en/docs/admin-manual/config/be-config.md    |  6 ---
 docs/zh-CN/docs/admin-manual/config/be-config.md |  6 ---
 7 files changed, 12 insertions(+), 93 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1241ae39e67..03eaee7b23c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -792,12 +792,6 @@ DEFINE_Validator(max_send_batch_parallelism_per_job,
 DEFINE_Int32(send_batch_thread_pool_thread_num, "64");
 // number of send batch thread pool queue size
 DEFINE_Int32(send_batch_thread_pool_queue_size, "102400");
-// number of download cache thread pool size
-DEFINE_Int32(download_cache_thread_pool_thread_num, "48");
-// number of download cache thread pool queue size
-DEFINE_Int32(download_cache_thread_pool_queue_size, "102400");
-// download cache buffer size
-DEFINE_Int64(download_cache_buffer_size, "10485760");
 
 // Limit the number of segment of a newly created rowset.
 // The newly created rowset may to be compacted after loading,
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 856e2482745..e011073d44d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -851,12 +851,6 @@ DECLARE_mInt32(max_send_batch_parallelism_per_job);
 DECLARE_Int32(send_batch_thread_pool_thread_num);
 // number of send batch thread pool queue size
 DECLARE_Int32(send_batch_thread_pool_queue_size);
-// number of download cache thread pool size
-DECLARE_Int32(download_cache_thread_pool_thread_num);
-// number of download cache thread pool queue size
-DECLARE_Int32(download_cache_thread_pool_queue_size);
-// download cache buffer size
-DECLARE_Int64(download_cache_buffer_size);
 
 // Limit the number of segment of a newly created rowset.
 // The newly created rowset may to be compacted after loading,
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index ce534c7ce04..95c4e97e1b3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -18,10 +18,10 @@
 #pragma once
 
 #include <common/multi_version.h>
-#include <stddef.h>
 
 #include <algorithm>
 #include <atomic>
+#include <cstddef>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -168,7 +168,6 @@ public:
     MemTracker* brpc_iobuf_block_memory_tracker() { return 
_brpc_iobuf_block_memory_tracker.get(); }
 
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
-    ThreadPool* download_cache_thread_pool() { return 
_download_cache_thread_pool.get(); }
     ThreadPool* buffered_reader_prefetch_thread_pool() {
         return _buffered_reader_prefetch_thread_pool.get();
     }
@@ -177,23 +176,8 @@ public:
     ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); 
}
     ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); 
}
 
-    void set_serial_download_cache_thread_token() {
-        _serial_download_cache_thread_token =
-                
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
-    }
-    ThreadPoolToken* get_serial_download_cache_thread_token() {
-        return _serial_download_cache_thread_token.get();
-    }
-    void init_download_cache_buf();
-    void init_download_cache_required_components();
     Status init_pipeline_task_scheduler();
     void init_file_cache_factory();
-    char* get_download_cache_buf(ThreadPoolToken* token) {
-        if (_download_cache_buf_map.find(token) == 
_download_cache_buf_map.end()) {
-            return nullptr;
-        }
-        return _download_cache_buf_map[token].get();
-    }
     io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
     UserFunctionCache* user_function_cache() { return _user_function_cache; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
@@ -322,23 +306,17 @@ private:
     std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
 
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
-
-    // Threadpool used to download cache from remote storage
-    std::unique_ptr<ThreadPool> _download_cache_thread_pool;
     // Threadpool used to prefetch remote file for buffered reader
     std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
     // Threadpool used to upload local file to s3
     std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool;
-    // A token used to submit download cache task serially
-    std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
     // Pool used by fragment manager to send profile or status to FE 
coordinator
     std::unique_ptr<ThreadPool> _send_report_thread_pool;
     // Pool used by join node to build hash table
     std::unique_ptr<ThreadPool> _join_node_thread_pool;
     // Pool to use a new thread to release object
     std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
-    // ThreadPoolToken -> buffer
-    std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> 
_download_cache_buf_map;
+
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
     pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 13cb861aac2..4df69d67c03 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -17,14 +17,14 @@
 
 // IWYU pragma: no_include <bthread/errno.h>
 #include <common/multi_version.h>
-#include <errno.h> // IWYU pragma: keep
 #include <gen_cpp/HeartbeatService_types.h>
 #include <gen_cpp/Metrics_types.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
 #include <sys/resource.h>
 
+#include <cerrno> // IWYU pragma: keep
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
 #include <limits>
 #include <map>
 #include <memory>
@@ -109,15 +109,13 @@ class PFunctionService_Stub;
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 
 static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
     bool init_system_metrics = config::enable_system_metrics;
     std::set<std::string> disk_devices;
     std::vector<std::string> network_interfaces;
     std::vector<std::string> paths;
-    for (auto& store_path : store_paths) {
+    for (const auto& store_path : store_paths) {
         paths.emplace_back(store_path.path);
     }
     if (init_system_metrics) {
@@ -167,8 +165,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
                               .build(&_send_batch_thread_pool));
 
-    init_download_cache_required_components();
-
     static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
                               .set_min_threads(16)
                               .set_max_threads(64)
@@ -336,9 +332,11 @@ void ExecEnv::init_file_cache_factory() {
                 continue;
             }
 
-            olap_res = file_cache_init_pool->submit_func(std::bind(
-                    &io::FileCacheFactory::create_file_cache, 
_file_cache_factory, cache_path.path,
-                    cache_path.init_settings(), 
&(cache_status.emplace_back())));
+            olap_res = file_cache_init_pool->submit_func(
+                    [this, capture0 = cache_path.path, capture1 = 
cache_path.init_settings(),
+                     capture2 = &(cache_status.emplace_back())] {
+                        _file_cache_factory->create_file_cache(capture0, 
capture1, capture2);
+                    });
 
             if (!olap_res.ok()) {
                 LOG(FATAL) << "failed to init file cache, err: " << olap_res;
@@ -355,7 +353,6 @@ void ExecEnv::init_file_cache_factory() {
             }
         }
     }
-    return;
 }
 
 Status ExecEnv::_init_mem_env() {
@@ -488,43 +485,18 @@ void ExecEnv::init_mem_tracker() {
             std::make_shared<MemTracker>("IOBufBlockMemory", 
_orphan_mem_tracker_raw);
 }
 
-void ExecEnv::init_download_cache_buf() {
-    std::unique_ptr<char[]> download_cache_buf(new 
char[config::download_cache_buffer_size]);
-    memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
-    _download_cache_buf_map[_serial_download_cache_thread_token.get()] =
-            std::move(download_cache_buf);
-}
-
-void ExecEnv::init_download_cache_required_components() {
-    static_cast<void>(ThreadPoolBuilder("DownloadCacheThreadPool")
-                              .set_min_threads(1)
-                              
.set_max_threads(config::download_cache_thread_pool_thread_num)
-                              
.set_max_queue_size(config::download_cache_thread_pool_queue_size)
-                              .build(&_download_cache_thread_pool));
-    set_serial_download_cache_thread_token();
-    init_download_cache_buf();
-}
-
 void ExecEnv::_register_metrics() {
     REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
                          [this]() { return 
_send_batch_thread_pool->num_threads(); });
 
     REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
                          [this]() { return 
_send_batch_thread_pool->get_queue_size(); });
-
-    REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num,
-                         [this]() { return 
_download_cache_thread_pool->num_threads(); });
-
-    REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size,
-                         [this]() { return 
_download_cache_thread_pool->get_queue_size(); });
 }
 
 void ExecEnv::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num);
-    DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
 }
 
 // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a 
Stop method.
@@ -572,8 +544,6 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_lazy_release_obj_pool);
     SAFE_SHUTDOWN(_send_report_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
-    SAFE_SHUTDOWN(_serial_download_cache_thread_token);
-    SAFE_SHUTDOWN(_download_cache_thread_pool);
 
     // Free resource after threads are stopped.
     // Some threads are still running, like threads created by 
_new_load_stream_mgr ...
@@ -645,9 +615,6 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_external_scan_context_mgr);
     SAFE_DELETE(_user_function_cache);
 
-    _serial_download_cache_thread_token.reset(nullptr);
-    _download_cache_thread_pool.reset(nullptr);
-
     // _heartbeat_flags must be destoried after staroge engine
     SAFE_DELETE(_heartbeat_flags);
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index db52ceb405d..72ee81cd49b 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -194,8 +194,6 @@ public:
     UIntGauge* add_batch_task_queue_size = nullptr;
     UIntGauge* send_batch_thread_pool_thread_num = nullptr;
     UIntGauge* send_batch_thread_pool_queue_size = nullptr;
-    UIntGauge* download_cache_thread_pool_thread_num = nullptr;
-    UIntGauge* download_cache_thread_pool_queue_size = nullptr;
     UIntGauge* fragment_thread_pool_queue_size = nullptr;
 
     // Upload metrics
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 1dcf7dbb241..c55bc8d00b8 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -980,12 +980,6 @@ BaseCompaction:546859:
 * Description: Interval in milliseconds between memtable flush mgr refresh 
iterations
 * Default value: 100
 
-#### `download_cache_buffer_size`
-
-* Type: int64
-* Description: The size of the buffer used to receive data when downloading 
the cache.
-* Default value: 10485760
-
 #### `zone_map_row_num_threshold`
 
 * Type: int32
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 802279c4cd0..567c3c598c5 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1005,12 +1005,6 @@ BaseCompaction:546859:
 * 描述:memtable主动下刷时刷新内存统计的周期(毫秒)
 * 默认值:100
 
-#### `download_cache_buffer_size`
-
-* 类型: int64
-* 描述: 下载缓存时用于接收数据的buffer的大小。
-* 默认值: 10485760
-
 #### `zone_map_row_num_threshold`
 
 * 类型: int32


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to