This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e4adf9b9314 [fix](routine load) replace heavy work pool with routine load thread pool for metadata fetching (#44907) (#46186) e4adf9b9314 is described below commit e4adf9b9314cd221cc40214909a2104a80d0381e Author: hui lai <lai...@selectdb.com> AuthorDate: Tue Dec 31 23:06:08 2024 +0800 [fix](routine load) replace heavy work pool with routine load thread pool for metadata fetching (#44907) (#46186) pick #44907 In production, we encountered an issue where the librdkafka consumer stucked during destruction, causing the heavy work pool to become saturated, which in turn made all heavy work pool-dependent functionalities, such as querying, unusable. To mitigate this impact, we replaced the heavy work pool with routine load threads for metadata fetching. --- be/src/runtime/routine_load/routine_load_task_executor.h | 2 ++ be/src/service/internal_service.cpp | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 0e597d796c9..b1196f7824a 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -73,6 +73,8 @@ public: std::vector<PIntegerPair>* partition_offsets, int timeout); + ThreadPool& get_thread_pool() { return *_thread_pool; } + private: // execute the task void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* pool, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 701fc6c018d..ff29a7de2da 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1262,7 +1262,10 @@ void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcContro void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([this, request, response, done]() { + bool ret = _exec_env->routine_load_task_executor()->get_thread_pool().submit_func([this, + request, + response, + done]() { brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org