This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d7163ea2cf [Enhancement](thread) FE fetch_data use brpc thread 
locally in BE  (#44928)
3d7163ea2cf is described below

commit 3d7163ea2cf0eaa13147f89c9c350141a2f24f9e
Author: zclllhhjj <zhaochan...@selectdb.com>
AuthorDate: Tue Dec 10 12:05:16 2024 +0800

    [Enhancement](thread) FE fetch_data use brpc thread locally in BE  (#44928)
    
    
    1. do `fetch_data` in brpc thread locally
    2. add more gauge for fragment async thread
    3. fix wrong log info for FifoThreadPool
---
 be/src/runtime/buffer_control_block.cpp      |  6 ++---
 be/src/runtime/buffer_control_block.h        | 10 ++++----
 be/src/runtime/fragment_mgr.cpp              | 21 +++++++----------
 be/src/runtime/fragment_mgr.h                |  3 +--
 be/src/service/internal_service.cpp          | 14 ++++-------
 be/src/util/doris_metrics.h                  |  1 +
 be/src/util/threadpool.h                     | 35 ++++++++++++++++------------
 be/src/util/work_thread_pool.hpp             | 14 +++++------
 be/src/vec/sink/writer/async_result_writer.h |  6 ++---
 9 files changed, 53 insertions(+), 57 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 8c1ae79955f..6f4427746f8 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -30,12 +30,10 @@
 #include <utility>
 #include <vector>
 
-#include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
 #include "pipeline/dependency.h"
 #include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
-#include "util/string_util.h"
 #include "util/thrift_util.h"
 #include "vec/core/block.h"
 
@@ -149,8 +147,8 @@ void GetArrowResultBatchCtx::on_data(
     delete this;
 }
 
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, 
RuntimeState* state)
-        : _fragment_id(id),
+BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size, 
RuntimeState* state)
+        : _fragment_id(std::move(id)),
           _is_close(false),
           _is_cancelled(false),
           _buffer_limit(buffer_size),
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 249e1ba7652..9060007232e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -21,10 +21,10 @@
 #include <cctz/time_zone.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
-#include <stdint.h>
 
 #include <atomic>
 #include <condition_variable>
+#include <cstdint>
 #include <deque>
 #include <list>
 #include <memory>
@@ -34,7 +34,6 @@
 #include "common/status.h"
 #include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
-#include "util/hash_util.hpp"
 
 namespace google::protobuf {
 class Closure;
@@ -98,13 +97,15 @@ struct GetArrowResultBatchCtx {
 // buffer used for result customer and producer
 class BufferControlBlock {
 public:
-    BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* 
state);
+    BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state);
     ~BufferControlBlock();
 
     Status init();
+    // try to consume _waiting_rpc or make data waiting in 
_fe_result_batch_queue. try to combine block to reduce rpc first.
     Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& 
result);
     Status add_arrow_batch(RuntimeState* state, 
std::shared_ptr<vectorized::Block>& result);
 
+    // if there's Block waiting in _fe_result_batch_queue, send it(by 
on_data). otherwise make a rpc wait in _waiting_rpc.
     void get_batch(GetResultBatchCtx* ctx);
     // for ArrowFlightBatchLocalReader
     Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
@@ -150,7 +151,7 @@ protected:
     const int _buffer_limit;
     int64_t _packet_num;
 
-    // blocking queue for batch
+    // Producer. blocking queue for result batch waiting to sent to FE by 
_waiting_rpc.
     FeResultQueue _fe_result_batch_queue;
     ArrowFlightResultQueue _arrow_flight_result_batch_queue;
     // for arrow flight
@@ -163,6 +164,7 @@ protected:
     // TODO, waiting for data will block pipeline, so use a request pool to 
save requests waiting for data.
     std::condition_variable _arrow_data_arrival;
 
+    // Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue 
filled, the rpc could be sent.
     std::deque<GetResultBatchCtx*> _waiting_rpc;
     std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f96e4152500..b1bc42491b5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -34,17 +34,16 @@
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <pthread.h>
-#include <stddef.h>
 #include <sys/time.h>
 #include <thrift/TApplicationException.h>
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TTransportException.h>
-#include <time.h>
 #include <unistd.h>
 
 #include <algorithm>
-#include <atomic>
+#include <cstddef>
+#include <ctime>
 
 #include "common/status.h"
 // IWYU pragma: no_include <bits/chrono.h>
@@ -58,19 +57,16 @@
 #include <unordered_set>
 #include <utility>
 
-#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/utils.h"
-#include "gutil/strings/substitute.h"
 #include "io/fs/stream_load_pipe.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/frontend_info.h"
-#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/primitive_type.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_filter_mgr.h"
@@ -89,24 +85,20 @@
 #include "util/debug_points.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
-#include "util/hash_util.hpp"
-#include "util/mem_info.h"
 #include "util/network_util.h"
-#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
-#include "util/url_coding.h"
 #include "vec/runtime/shared_hash_table_controller.h"
-#include "vec/runtime/vdatetime_value.h"
 
 namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads, 
MetricUnit::NOUNIT);
 bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", 
"prepare");
 
 bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
@@ -184,7 +176,7 @@ static Status _do_fetch_running_queries_rpc(const 
FrontendInfo& fe_info,
     }
 
     // Avoid logic error in frontend.
-    if (rpc_result.__isset.status == false || rpc_result.status.status_code != 
TStatusCode::OK) {
+    if (!rpc_result.__isset.status || rpc_result.status.status_code != 
TStatusCode::OK) {
         LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
                     doris::to_string(rpc_result.status.status_code));
@@ -193,7 +185,7 @@ static Status _do_fetch_running_queries_rpc(const 
FrontendInfo& fe_info,
                                      
doris::to_string(rpc_result.status.status_code));
     }
 
-    if (rpc_result.__isset.running_queries == false) {
+    if (!rpc_result.__isset.running_queries) {
         return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
                                      
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
                                      "running_queries is not set");
@@ -254,6 +246,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
 
     REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
                          [this]() { return _thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads,
+                         [this]() { return _thread_pool->num_active_threads(); 
});
     CHECK(s.ok()) << s.to_string();
 }
 
@@ -262,6 +256,7 @@ FragmentMgr::~FragmentMgr() = default;
 void FragmentMgr::stop() {
     DEREGISTER_HOOK_METRIC(fragment_instance_count);
     DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads);
     _stop_background_threads_latch.count_down();
     if (_cancel_thread) {
         _cancel_thread->join();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 63d666788d0..e85fb07cba6 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -21,9 +21,8 @@
 #include <gen_cpp/QueryPlanExtra_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
-#include <stdint.h>
 
-#include <condition_variable>
+#include <cstdint>
 #include <functional>
 #include <iosfwd>
 #include <memory>
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 439f3f17faf..fb0b2f090bc 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -665,15 +665,11 @@ void 
PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c
 void PInternalService::fetch_data(google::protobuf::RpcController* controller,
                                   const PFetchDataRequest* request, 
PFetchDataResult* result,
                                   google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.try_offer([this, controller, request, result, 
done]() {
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
-    });
-    if (!ret) {
-        offer_failed(result, done, _heavy_work_pool);
-        return;
-    }
+    // fetch_data is a light operation which will put a request rather than 
wait inplace when there's no data ready.
+    // when there's data ready, use brpc to send. there's queue in brpc 
service. won't take it too long.
+    auto* cntl = static_cast<brpc::Controller*>(controller);
+    auto* ctx = new GetResultBatchCtx(cntl, result, done);
+    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
 
 void PInternalService::fetch_arrow_data(google::protobuf::RpcController* 
controller,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 69516773deb..31b907eec9e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -202,6 +202,7 @@ public:
     UIntGauge* send_batch_thread_pool_thread_num = nullptr;
     UIntGauge* send_batch_thread_pool_queue_size = nullptr;
     UIntGauge* fragment_thread_pool_queue_size = nullptr;
+    UIntGauge* fragment_thread_pool_num_active_threads = nullptr;
 
     // Upload metrics
     UIntGauge* upload_total_byte = nullptr;
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 9bd4a7246fb..f822c307aa6 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -20,12 +20,11 @@
 
 #pragma once
 
-#include <limits.h>
-#include <stddef.h>
-
 #include <boost/intrusive/detail/algo_type.hpp>
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive/list_hook.hpp>
+#include <climits>
+#include <cstddef>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <condition_variable>
@@ -50,7 +49,7 @@ class ThreadPoolToken;
 class Runnable {
 public:
     virtual void run() = 0;
-    virtual ~Runnable() {}
+    virtual ~Runnable() = default;
 };
 
 // ThreadPool takes a lot of arguments. We provide sane defaults with a 
builder.
@@ -127,6 +126,9 @@ public:
         return Status::OK();
     }
 
+    ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
+    void operator=(const ThreadPoolBuilder&) = delete;
+
 private:
     friend class ThreadPool;
     const std::string _name;
@@ -136,9 +138,6 @@ private:
     std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::chrono::milliseconds _idle_timeout;
 
-    ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
-    void operator=(const ThreadPoolBuilder&) = delete;
-
     template <typename T>
     static constexpr bool always_false_v = false;
 };
@@ -256,13 +255,22 @@ public:
         return _total_queued_tasks;
     }
 
-    std::vector<int> debug_info() {
+    std::vector<int> debug_info() const {
         std::lock_guard<std::mutex> l(_lock);
         std::vector<int> arr = {_num_threads, 
static_cast<int>(_threads.size()), _min_threads,
                                 _max_threads};
         return arr;
     }
 
+    std::string get_info() const {
+        std::lock_guard<std::mutex> l(_lock);
+        return fmt::format("ThreadPool(name={}, 
threads(active/pending)=({}/{}), queued_task={})",
+                           _name, _active_threads, _num_threads_pending_start, 
_total_queued_tasks);
+    }
+
+    ThreadPool(const ThreadPool&) = delete;
+    void operator=(const ThreadPool&) = delete;
+
 private:
     friend class ThreadPoolBuilder;
     friend class ThreadPoolToken;
@@ -372,7 +380,7 @@ private:
     //
     // Protected by _lock.
     struct IdleThread : public boost::intrusive::list_base_hook<> {
-        explicit IdleThread() {}
+        explicit IdleThread() = default;
 
         // Condition variable for "queue is not empty". Waiters wake up when a 
new
         // task is queued.
@@ -384,9 +392,6 @@ private:
 
     // ExecutionMode::CONCURRENT token used by the pool for tokenless 
submission.
     std::unique_ptr<ThreadPoolToken> _tokenless;
-
-    ThreadPool(const ThreadPool&) = delete;
-    void operator=(const ThreadPool&) = delete;
 };
 
 // Entry point for token-based task submission and blocking for a particular
@@ -434,6 +439,9 @@ public:
         return _entries.size();
     }
 
+    ThreadPoolToken(const ThreadPoolToken&) = delete;
+    void operator=(const ThreadPoolToken&) = delete;
+
 private:
     // All possible token states. Legal state transitions:
     //   IDLE      -> RUNNING: task is submitted via token
@@ -516,9 +524,6 @@ private:
     int _num_submitted_tasks;
     // Number of tasks which has not been submitted to the thread pool's queue.
     int _num_unsubmitted_tasks;
-
-    ThreadPoolToken(const ThreadPoolToken&) = delete;
-    void operator=(const ThreadPoolToken&) = delete;
 };
 
 } // namespace doris
diff --git a/be/src/util/work_thread_pool.hpp b/be/src/util/work_thread_pool.hpp
index 00430ff7514..1da8a08f90d 100644
--- a/be/src/util/work_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -18,7 +18,6 @@
 #pragma once
 
 #include <mutex>
-#include <thread>
 
 #include "util/blocking_priority_queue.hpp"
 #include "util/blocking_queue.hpp"
@@ -126,12 +125,13 @@ public:
     }
 
     std::string get_info() const {
-        return fmt::format(
-                "PriorityThreadPool(name={}, queue_size={}/{}, 
active_thread={}/{}, "
-                "total_get_wait_time={}, total_put_wait_time={})",
-                _name, get_queue_size(), _work_queue.get_capacity(), 
_active_threads,
-                _threads.size(), _work_queue.total_get_wait_time(),
-                _work_queue.total_put_wait_time());
+        return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
+               fmt::format(
+                       "(name={}, queue_size={}/{}, active_thread={}/{}, "
+                       "total_get_wait_time={}, total_put_wait_time={})",
+                       _name, get_queue_size(), _work_queue.get_capacity(), 
_active_threads,
+                       _threads.size(), _work_queue.total_get_wait_time(),
+                       _work_queue.total_put_wait_time());
     }
 
 protected:
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 513f2aa7984..2a90dd2dbd0 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -19,7 +19,7 @@
 #include <concurrentqueue.h>
 
 #include <condition_variable>
-#include <queue>
+#include <queue> // IWYU pragma: keep
 
 #include "runtime/result_writer.h"
 #include "vec/exprs/vexpr_fwd.h"
@@ -49,7 +49,7 @@ class Block;
  *  pipeline execution engine performance.
  *
  *  The Sub class of AsyncResultWriter need to impl two virtual function
- *     * Status open() the first time IO work like: create file/ connect 
networking
+ *     * Status open() the first time IO work like: create file/ connect 
network
  *     * Status write() do the real IO work for block 
  */
 class AsyncResultWriter : public ResultWriter {
@@ -64,7 +64,7 @@ public:
 
     virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
 
-    // sink the block date to date queue, it is async
+    // sink the block data to data queue, it is async
     Status sink(Block* block, bool eos);
 
     // Add the IO thread task process block() to thread pool to dispose the IO


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to