This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3d7163ea2cf [Enhancement](thread) FE fetch_data use brpc thread locally in BE (#44928) 3d7163ea2cf is described below commit 3d7163ea2cf0eaa13147f89c9c350141a2f24f9e Author: zclllhhjj <zhaochan...@selectdb.com> AuthorDate: Tue Dec 10 12:05:16 2024 +0800 [Enhancement](thread) FE fetch_data use brpc thread locally in BE (#44928) 1. do `fetch_data` in brpc thread locally 2. add more gauge for fragment async thread 3. fix wrong log info for FifoThreadPool --- be/src/runtime/buffer_control_block.cpp | 6 ++--- be/src/runtime/buffer_control_block.h | 10 ++++---- be/src/runtime/fragment_mgr.cpp | 21 +++++++---------- be/src/runtime/fragment_mgr.h | 3 +-- be/src/service/internal_service.cpp | 14 ++++------- be/src/util/doris_metrics.h | 1 + be/src/util/threadpool.h | 35 ++++++++++++++++------------ be/src/util/work_thread_pool.hpp | 14 +++++------ be/src/vec/sink/writer/async_result_writer.h | 6 ++--- 9 files changed, 53 insertions(+), 57 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 8c1ae79955f..6f4427746f8 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -30,12 +30,10 @@ #include <utility> #include <vector> -#include "arrow/record_batch.h" #include "arrow/type_fwd.h" #include "pipeline/dependency.h" #include "runtime/thread_context.h" #include "util/runtime_profile.h" -#include "util/string_util.h" #include "util/thrift_util.h" #include "vec/core/block.h" @@ -149,8 +147,8 @@ void GetArrowResultBatchCtx::on_data( delete this; } -BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state) - : _fragment_id(id), +BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state) + : _fragment_id(std::move(id)), _is_close(false), _is_cancelled(false), _buffer_limit(buffer_size), diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 249e1ba7652..9060007232e 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -21,10 +21,10 @@ #include <cctz/time_zone.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> -#include <stdint.h> #include <atomic> #include <condition_variable> +#include <cstdint> #include <deque> #include <list> #include <memory> @@ -34,7 +34,6 @@ #include "common/status.h" #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" -#include "util/hash_util.hpp" namespace google::protobuf { class Closure; @@ -98,13 +97,15 @@ struct GetArrowResultBatchCtx { // buffer used for result customer and producer class BufferControlBlock { public: - BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state); + BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state); ~BufferControlBlock(); Status init(); + // try to consume _waiting_rpc or make data waiting in _fe_result_batch_queue. try to combine block to reduce rpc first. Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& result); Status add_arrow_batch(RuntimeState* state, std::shared_ptr<vectorized::Block>& result); + // if there's Block waiting in _fe_result_batch_queue, send it(by on_data). otherwise make a rpc wait in _waiting_rpc. void get_batch(GetResultBatchCtx* ctx); // for ArrowFlightBatchLocalReader Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result, @@ -150,7 +151,7 @@ protected: const int _buffer_limit; int64_t _packet_num; - // blocking queue for batch + // Producer. blocking queue for result batch waiting to sent to FE by _waiting_rpc. FeResultQueue _fe_result_batch_queue; ArrowFlightResultQueue _arrow_flight_result_batch_queue; // for arrow flight @@ -163,6 +164,7 @@ protected: // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. std::condition_variable _arrow_data_arrival; + // Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue filled, the rpc could be sent. std::deque<GetResultBatchCtx*> _waiting_rpc; std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f96e4152500..b1bc42491b5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -34,17 +34,16 @@ #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> #include <pthread.h> -#include <stddef.h> #include <sys/time.h> #include <thrift/TApplicationException.h> #include <thrift/Thrift.h> #include <thrift/protocol/TDebugProtocol.h> #include <thrift/transport/TTransportException.h> -#include <time.h> #include <unistd.h> #include <algorithm> -#include <atomic> +#include <cstddef> +#include <ctime> #include "common/status.h" // IWYU pragma: no_include <bits/chrono.h> @@ -58,19 +57,16 @@ #include <unordered_set> #include <utility> -#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/object_pool.h" #include "common/utils.h" -#include "gutil/strings/substitute.h" #include "io/fs/stream_load_pipe.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/frontend_info.h" -#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/primitive_type.h" #include "runtime/query_context.h" #include "runtime/runtime_filter_mgr.h" @@ -89,24 +85,20 @@ #include "util/debug_points.h" #include "util/debug_util.h" #include "util/doris_metrics.h" -#include "util/hash_util.hpp" -#include "util/mem_info.h" #include "util/network_util.h" -#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/thread.h" #include "util/threadpool.h" #include "util/thrift_util.h" #include "util/uid_util.h" -#include "util/url_coding.h" #include "vec/runtime/shared_hash_table_controller.h" -#include "vec/runtime/vdatetime_value.h" namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads, MetricUnit::NOUNIT); bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count"); @@ -184,7 +176,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, } // Avoid logic error in frontend. - if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) { + if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) { LOG_WARNING("Failed to fetch running queries from {}, reason: {}", PrintThriftNetworkAddress(fe_info.info.coordinator_address), doris::to_string(rpc_result.status.status_code)); @@ -193,7 +185,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, doris::to_string(rpc_result.status.status_code)); } - if (rpc_result.__isset.running_queries == false) { + if (!rpc_result.__isset.running_queries) { return Status::InternalError("Failed to fetch running queries from {}, reason: {}", PrintThriftNetworkAddress(fe_info.info.coordinator_address), "running_queries is not set"); @@ -254,6 +246,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, [this]() { return _thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads, + [this]() { return _thread_pool->num_active_threads(); }); CHECK(s.ok()) << s.to_string(); } @@ -262,6 +256,7 @@ FragmentMgr::~FragmentMgr() = default; void FragmentMgr::stop() { DEREGISTER_HOOK_METRIC(fragment_instance_count); DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads); _stop_background_threads_latch.count_down(); if (_cancel_thread) { _cancel_thread->join(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 63d666788d0..e85fb07cba6 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -21,9 +21,8 @@ #include <gen_cpp/QueryPlanExtra_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> -#include <stdint.h> -#include <condition_variable> +#include <cstdint> #include <functional> #include <iosfwd> #include <memory> diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 439f3f17faf..fb0b2f090bc 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -665,15 +665,11 @@ void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c void PInternalService::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, controller, request, result, done]() { - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); - }); - if (!ret) { - offer_failed(result, done, _heavy_work_pool); - return; - } + // fetch_data is a light operation which will put a request rather than wait inplace when there's no data ready. + // when there's data ready, use brpc to send. there's queue in brpc service. won't take it too long. + auto* cntl = static_cast<brpc::Controller*>(controller); + auto* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 69516773deb..31b907eec9e 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -202,6 +202,7 @@ public: UIntGauge* send_batch_thread_pool_thread_num = nullptr; UIntGauge* send_batch_thread_pool_queue_size = nullptr; UIntGauge* fragment_thread_pool_queue_size = nullptr; + UIntGauge* fragment_thread_pool_num_active_threads = nullptr; // Upload metrics UIntGauge* upload_total_byte = nullptr; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 9bd4a7246fb..f822c307aa6 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -20,12 +20,11 @@ #pragma once -#include <limits.h> -#include <stddef.h> - #include <boost/intrusive/detail/algo_type.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/list_hook.hpp> +#include <climits> +#include <cstddef> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <condition_variable> @@ -50,7 +49,7 @@ class ThreadPoolToken; class Runnable { public: virtual void run() = 0; - virtual ~Runnable() {} + virtual ~Runnable() = default; }; // ThreadPool takes a lot of arguments. We provide sane defaults with a builder. @@ -127,6 +126,9 @@ public: return Status::OK(); } + ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; + void operator=(const ThreadPoolBuilder&) = delete; + private: friend class ThreadPool; const std::string _name; @@ -136,9 +138,6 @@ private: std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; std::chrono::milliseconds _idle_timeout; - ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; - void operator=(const ThreadPoolBuilder&) = delete; - template <typename T> static constexpr bool always_false_v = false; }; @@ -256,13 +255,22 @@ public: return _total_queued_tasks; } - std::vector<int> debug_info() { + std::vector<int> debug_info() const { std::lock_guard<std::mutex> l(_lock); std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, _max_threads}; return arr; } + std::string get_info() const { + std::lock_guard<std::mutex> l(_lock); + return fmt::format("ThreadPool(name={}, threads(active/pending)=({}/{}), queued_task={})", + _name, _active_threads, _num_threads_pending_start, _total_queued_tasks); + } + + ThreadPool(const ThreadPool&) = delete; + void operator=(const ThreadPool&) = delete; + private: friend class ThreadPoolBuilder; friend class ThreadPoolToken; @@ -372,7 +380,7 @@ private: // // Protected by _lock. struct IdleThread : public boost::intrusive::list_base_hook<> { - explicit IdleThread() {} + explicit IdleThread() = default; // Condition variable for "queue is not empty". Waiters wake up when a new // task is queued. @@ -384,9 +392,6 @@ private: // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr<ThreadPoolToken> _tokenless; - - ThreadPool(const ThreadPool&) = delete; - void operator=(const ThreadPool&) = delete; }; // Entry point for token-based task submission and blocking for a particular @@ -434,6 +439,9 @@ public: return _entries.size(); } + ThreadPoolToken(const ThreadPoolToken&) = delete; + void operator=(const ThreadPoolToken&) = delete; + private: // All possible token states. Legal state transitions: // IDLE -> RUNNING: task is submitted via token @@ -516,9 +524,6 @@ private: int _num_submitted_tasks; // Number of tasks which has not been submitted to the thread pool's queue. int _num_unsubmitted_tasks; - - ThreadPoolToken(const ThreadPoolToken&) = delete; - void operator=(const ThreadPoolToken&) = delete; }; } // namespace doris diff --git a/be/src/util/work_thread_pool.hpp b/be/src/util/work_thread_pool.hpp index 00430ff7514..1da8a08f90d 100644 --- a/be/src/util/work_thread_pool.hpp +++ b/be/src/util/work_thread_pool.hpp @@ -18,7 +18,6 @@ #pragma once #include <mutex> -#include <thread> #include "util/blocking_priority_queue.hpp" #include "util/blocking_queue.hpp" @@ -126,12 +125,13 @@ public: } std::string get_info() const { - return fmt::format( - "PriorityThreadPool(name={}, queue_size={}/{}, active_thread={}/{}, " - "total_get_wait_time={}, total_put_wait_time={})", - _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, - _threads.size(), _work_queue.total_get_wait_time(), - _work_queue.total_put_wait_time()); + return (Priority ? "PriorityThreadPool" : "FifoThreadPool") + + fmt::format( + "(name={}, queue_size={}/{}, active_thread={}/{}, " + "total_get_wait_time={}, total_put_wait_time={})", + _name, get_queue_size(), _work_queue.get_capacity(), _active_threads, + _threads.size(), _work_queue.total_get_wait_time(), + _work_queue.total_put_wait_time()); } protected: diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 513f2aa7984..2a90dd2dbd0 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -19,7 +19,7 @@ #include <concurrentqueue.h> #include <condition_variable> -#include <queue> +#include <queue> // IWYU pragma: keep #include "runtime/result_writer.h" #include "vec/exprs/vexpr_fwd.h" @@ -49,7 +49,7 @@ class Block; * pipeline execution engine performance. * * The Sub class of AsyncResultWriter need to impl two virtual function - * * Status open() the first time IO work like: create file/ connect networking + * * Status open() the first time IO work like: create file/ connect network * * Status write() do the real IO work for block */ class AsyncResultWriter : public ResultWriter { @@ -64,7 +64,7 @@ public: virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0; - // sink the block date to date queue, it is async + // sink the block data to data queue, it is async Status sink(Block* block, bool eos); // Add the IO thread task process block() to thread pool to dispose the IO --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org