This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f48cfd31ac [fix](routine load) replace heavy work pool with routine 
load thread pool for metadata fetching (#44907)
0f48cfd31ac is described below

commit 0f48cfd31ac749db9075b799ba6c815c401d2aeb
Author: hui lai <lai...@selectdb.com>
AuthorDate: Thu Dec 5 15:42:26 2024 +0800

    [fix](routine load) replace heavy work pool with routine load thread pool 
for metadata fetching (#44907)
    
    In production, we encountered an issue where the librdkafka consumer
    stucked during destruction, causing the heavy work pool to become
    saturated, which in turn made all heavy work pool-dependent
    functionalities, such as querying, unusable. To mitigate this impact, we
    replaced the heavy work pool with routine load threads for metadata
    fetching.
---
 be/src/runtime/routine_load/routine_load_task_executor.h | 2 ++
 be/src/service/internal_service.cpp                      | 5 ++++-
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 0e597d796c9..b1196f7824a 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -73,6 +73,8 @@ public:
                                                  std::vector<PIntegerPair>* 
partition_offsets,
                                                  int timeout);
 
+    ThreadPool& get_thread_pool() { return *_thread_pool; }
+
 private:
     // execute the task
     void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* 
pool,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index adcd07e7de7..439f3f17faf 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1240,7 +1240,10 @@ void 
PInternalService::report_stream_load_status(google::protobuf::RpcController
 void PInternalService::get_info(google::protobuf::RpcController* controller,
                                 const PProxyRequest* request, PProxyResult* 
response,
                                 google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+    bool ret = 
_exec_env->routine_load_task_executor()->get_thread_pool().submit_func([this,
+                                                                               
        request,
+                                                                               
        response,
+                                                                               
        done]() {
         brpc::ClosureGuard closure_guard(done);
         // PProxyRequest is defined in gensrc/proto/internal_service.proto
         // Currently it supports 2 kinds of requests:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to