This is an automated email from the ASF dual-hosted git repository.

linzhongcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e3be4eff5 [Improvement](brpc) Using a thread pool for RPC service 
avoiding std::mutex block brpc::bthread (#16639)
0e3be4eff5 is described below

commit 0e3be4eff59b8e4e3d21791a8bb7e8cb496d2f2f
Author: chenlinzhong <490103...@qq.com>
AuthorDate: Wed Feb 22 14:15:47 2023 +0800

    [Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex 
block brpc::bthread (#16639)
    
    mainly include:
    - brpc service adds two types of thread pools. The number of "light" and 
"heavy" thread pools is different
    Classify the interfaces of be. Those related to data transmission are 
classified as heavy interfaces and others as light interfaces
    - Add some monitoring to the thread pool, including the queue size and the 
number of active threads. Use these
    - indicators to guide the configuration of the number of threads
---
 be/src/common/config.h                             |  14 +-
 be/src/service/internal_service.cpp                | 953 +++++++++++++--------
 be/src/service/internal_service.h                  |   9 +-
 be/src/util/doris_metrics.h                        |  10 +
 be/src/util/priority_thread_pool.hpp               |   6 +-
 .../maint-monitor/monitor-metrics/metrics.md       |   6 +
 gensrc/proto/internal_service.proto                |   1 +
 7 files changed, 637 insertions(+), 362 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8516ea4fa9..f15d30bc7d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
 // port for brpc
 CONF_Int32(brpc_port, "8060");
 
-// the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1,
+// which means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1");
 
 // port to brpc server for single replica load
@@ -388,8 +389,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
 CONF_Int64(load_data_reserve_hours, "4");
 // log error log will be removed after this time
 CONF_mInt64(load_error_log_reserve_hours, "48");
-CONF_Int32(number_tablet_writer_threads, "16");
-CONF_Int32(number_slave_replica_download_threads, "64");
+
+// be brpc interface is classified into two categories: light and heavy
+// each category has diffrent thread number
+// threads to handle heavy api interface, such as transmit_data/transmit_block 
etc
+CONF_Int32(brpc_heavy_work_pool_threads, "192");
+// threads to handle light api interface, such as 
exec_plan_fragment_prepare/exec_plan_fragment_start
+CONF_Int32(brpc_light_work_pool_threads, "32");
+CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
+CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
 
 // The maximum amount of data that can be processed by a stream load
 CONF_mInt64(streaming_load_max_mb, "10240");
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index a8ed5ecfd8..3b271e4152 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -70,7 +70,15 @@ using namespace ErrorCode;
 
 const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, 
MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
 
 bthread_key_t btls_key;
 
@@ -104,32 +112,58 @@ private:
 
 PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env),
-          _tablet_worker_pool(config::number_tablet_writer_threads, 10240, 
"tablet_writer"),
-          
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
-                                     "replica_download") {
-    REGISTER_HOOK_METRIC(add_batch_task_queue_size,
-                         [this]() { return 
_tablet_worker_pool.get_queue_size(); });
+          _heavy_work_pool(config::brpc_heavy_work_pool_threads,
+                           config::brpc_heavy_work_pool_max_queue_size, 
"brpc_heavy"),
+          _light_work_pool(config::brpc_light_work_pool_threads,
+                           config::brpc_light_work_pool_max_queue_size, 
"brpc_light") {
+    REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
+                         [this]() { return _heavy_work_pool.get_queue_size(); 
});
+    REGISTER_HOOK_METRIC(light_work_pool_queue_size,
+                         [this]() { return _light_work_pool.get_queue_size(); 
});
+    REGISTER_HOOK_METRIC(heavy_work_active_threads,
+                         [this]() { return 
_heavy_work_pool.get_active_threads(); });
+    REGISTER_HOOK_METRIC(light_work_active_threads,
+                         [this]() { return 
_light_work_pool.get_active_threads(); });
+
+    REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
+                         []() { return 
config::brpc_heavy_work_pool_max_queue_size; });
+    REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
+                         []() { return 
config::brpc_light_work_pool_max_queue_size; });
+    REGISTER_HOOK_METRIC(heavy_work_max_threads,
+                         []() { return config::brpc_heavy_work_pool_threads; 
});
+    REGISTER_HOOK_METRIC(light_work_max_threads,
+                         []() { return config::brpc_light_work_pool_threads; 
});
+
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
     CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, 
AsyncIO::io_ctx_key_deleter));
 }
 
 PInternalServiceImpl::~PInternalServiceImpl() {
-    DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
+    DEREGISTER_HOOK_METRIC(light_work_active_threads);
+
+    DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
+    DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
+    DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
+    DEREGISTER_HOOK_METRIC(light_work_max_threads);
+
     CHECK_EQ(0, bthread_key_delete(btls_key));
     CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
 }
 
-void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* 
controller,
                                          const PTransmitDataParams* request,
                                          PTransmitDataResult* response,
                                          google::protobuf::Closure* done) {}
 
-void 
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* 
controller,
                                                  const PEmptyRequest* request,
                                                  PTransmitDataResult* response,
                                                  google::protobuf::Closure* 
done) {}
 
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
controller,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done,
@@ -139,22 +173,31 @@ void 
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
                                               const PTabletWriterOpenRequest* 
request,
                                               PTabletWriterOpenResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
-             << ", txn_id=" << request->txn_id();
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->load_channel_mgr()->open(*request);
-    if (!st.ok()) {
-        LOG(WARNING) << "load channel open failed, message=" << st << ", id=" 
<< request->id()
-                     << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        VLOG_RPC << "tablet writer open, id=" << request->id()
+                 << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->load_channel_mgr()->open(*request);
+        if (!st.ok()) {
+            LOG(WARNING) << "load channel open failed, message=" << st << ", 
id=" << request->id()
+                         << ", index_id=" << request->index_id()
+                         << ", txn_id=" << request->txn_id();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
controller,
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
cntl_base);
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
controller);
     auto scope = OpentelemetryScope {span};
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
@@ -168,67 +211,95 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
     st.to_protobuf(response->mutable_status());
 }
 
-void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 cntl_base,
+void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 controller,
                                                       const 
PExecPlanFragmentRequest* request,
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
-    exec_plan_fragment(cntl_base, request, response, done);
+    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
+        exec_plan_fragment(controller, request, response, done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void 
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* 
controller,
                                                     const 
PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* 
result,
                                                     google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
controller);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
-    st.to_protobuf(result->mutable_status());
+    bool ret = _light_work_pool.offer([this, controller, request, result, 
done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+        st.to_protobuf(result->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+    }
 }
 
-void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
controller,
                                                    const 
PTabletWriterAddBlockRequest* request,
                                                    
PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* 
done) {
-    // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, 
cntl);
+    bool ret = _heavy_work_pool.offer([this, controller, request, response, 
done]() {
+        // TODO(zxy) delete in 1.2 version
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
 
-    _tablet_writer_add_block(cntl_base, request, response, new_done);
+        _tablet_writer_add_block(controller, request, response, new_done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void PInternalServiceImpl::tablet_writer_add_block_by_http(
-        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
+        google::protobuf::RpcController* controller, const 
::doris::PEmptyRequest* request,
         PTabletWriterAddBlockResult* response, google::protobuf::Closure* 
done) {
-    PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
-                                                                               
        cntl);
-    if (st.ok()) {
-        _tablet_writer_add_block(cntl_base, new_request, response, new_done);
-    } else {
-        st.to_protobuf(response->mutable_status());
+    bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
+        PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
+        google::protobuf::Closure* new_done =
+                new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
+                new_request, cntl);
+        if (st.ok()) {
+            _tablet_writer_add_block(controller, new_request, response, 
new_done);
+        } else {
+            st.to_protobuf(response->mutable_status());
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
controller,
                                                     const 
PTabletWriterAddBlockRequest* request,
                                                     
PTabletWriterAddBlockResult* response,
                                                     google::protobuf::Closure* 
done) {
-    VLOG_RPC << "tablet writer add block, id=" << request->id()
-             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
-             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
     int64_t submit_task_time_ns = MonotonicNanos();
-    _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, 
this]() {
+    bool ret = _heavy_work_pool.offer([request, response, done, 
submit_task_time_ns, this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << 
st
@@ -241,20 +312,32 @@ void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
         response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
                                                 const 
PTabletWriterCancelRequest* request,
                                                 PTabletWriterCancelResult* 
response,
                                                 google::protobuf::Closure* 
done) {
-    VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" 
<< request->index_id()
-             << ", sender_id=" << request->sender_id();
-    brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->load_channel_mgr()->cancel(*request);
-    if (!st.ok()) {
-        LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
-                     << ", index_id=" << request->index_id()
-                     << ", sender_id=" << request->sender_id();
+    bool ret = _light_work_pool.offer([this, request, done]() {
+        VLOG_RPC << "tablet writer cancel, id=" << request->id()
+                 << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
+        brpc::ClosureGuard closure_guard(done);
+        auto st = _exec_env->load_channel_mgr()->cancel(*request);
+        if (!st.ok()) {
+            LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+                         << ", index_id=" << request->index_id()
+                         << ", sender_id=" << request->sender_id();
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
     }
 }
 
@@ -298,125 +381,149 @@ Status PInternalServiceImpl::_exec_plan_fragment(const 
std::string& ser_request,
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
-    auto scope = OpentelemetryScope {span};
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId tid;
-    tid.__set_hi(request->finst_id().hi());
-    tid.__set_lo(request->finst_id().lo());
-
-    Status st = Status::OK();
-    if (request->has_cancel_reason()) {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
-                  << ", reason: " << request->cancel_reason();
-        _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-    } else {
-        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
-        _exec_env->fragment_mgr()->cancel(tid);
+    bool ret = _light_work_pool.offer([this, controller, request, result, 
done]() {
+        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
+        auto scope = OpentelemetryScope {span};
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId tid;
+        tid.__set_hi(request->finst_id().hi());
+        tid.__set_lo(request->finst_id().lo());
+
+        Status st = Status::OK();
+        if (request->has_cancel_reason()) {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
+                      << ", reason: " << request->cancel_reason();
+            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+        } else {
+            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
+            _exec_env->fragment_mgr()->cancel(tid);
+        }
+        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
+        st.to_protobuf(result->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
     }
-
-    // TODO: the logic seems useless, cancel only return Status::OK. remove it
-    st.to_protobuf(result->mutable_status());
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    bool ret = _heavy_work_pool.offer([this, controller, request, result, 
done]() {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+    }
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    VLOG_RPC << "fetch table schema";
-    brpc::ClosureGuard closure_guard(done);
-    TFileScanRange file_scan_range;
-    Status st = Status::OK();
-    {
-        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-        uint32_t len = request->file_scan_range().size();
-        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+    bool ret = _heavy_work_pool.offer([request, result, done]() {
+        VLOG_RPC << "fetch table schema";
+        brpc::ClosureGuard closure_guard(done);
+        TFileScanRange file_scan_range;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+            uint32_t len = request->file_scan_range().size();
+            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
+            if (!st.ok()) {
+                LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+        if (file_scan_range.__isset.ranges == false) {
+            st = Status::InternalError("can not get TFileRangeDesc.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        if (file_scan_range.__isset.params == false) {
+            st = Status::InternalError("can not get TFileScanRangeParams.");
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+        const TFileScanRangeParams& params = file_scan_range.params;
+
+        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+        IOContext io_ctx;
+        FileCacheStatistics file_cache_statis;
+        io_ctx.file_cache_stats = &file_cache_statis;
+        switch (params.format_type) {
+        case TFileFormatType::FORMAT_CSV_PLAIN:
+        case TFileFormatType::FORMAT_CSV_GZ:
+        case TFileFormatType::FORMAT_CSV_BZ2:
+        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        case TFileFormatType::FORMAT_CSV_LZOP:
+        case TFileFormatType::FORMAT_CSV_DEFLATE: {
+            // file_slots is no use
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(
+                    new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_PARQUET: {
+            reader.reset(new vectorized::ParquetReader(params, range, 
&io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_ORC: {
+            std::vector<std::string> column_names;
+            reader.reset(new vectorized::OrcReader(params, range, 
column_names, "", &io_ctx));
+            break;
+        }
+        case TFileFormatType::FORMAT_JSON: {
+            std::vector<SlotDescriptor*> file_slots;
+            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots,
+                                                       &io_ctx));
+            break;
+        }
+        default:
+            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
+                                       params.format_type);
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        std::vector<std::string> col_names;
+        std::vector<TypeDescriptor> col_types;
+        st = reader->get_parsed_schema(&col_names, &col_types);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }
-    }
-    if (file_scan_range.__isset.ranges == false) {
-        st = Status::InternalError("can not get TFileRangeDesc.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    if (file_scan_range.__isset.params == false) {
-        st = Status::InternalError("can not get TFileScanRangeParams.");
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-    const TFileScanRangeParams& params = file_scan_range.params;
-
-    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-    IOContext io_ctx;
-    FileCacheStatistics file_cache_statis;
-    io_ctx.file_cache_stats = &file_cache_statis;
-    switch (params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_CSV_DEFLATE: {
-        // file_slots is no use
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_PARQUET: {
-        reader.reset(new vectorized::ParquetReader(params, range, &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_ORC: {
-        std::vector<std::string> column_names;
-        reader.reset(new vectorized::OrcReader(params, range, column_names, 
"", &io_ctx));
-        break;
-    }
-    case TFileFormatType::FORMAT_JSON: {
-        std::vector<SlotDescriptor*> file_slots;
-        reader.reset(
-                new vectorized::NewJsonReader(profile.get(), params, range, 
file_slots, &io_ctx));
-        break;
-    }
-    default:
-        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
-                                   params.format_type);
-        st.to_protobuf(result->mutable_status());
-        return;
-    }
-    std::vector<std::string> col_names;
-    std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsed_schema(&col_names, &col_types);
-    if (!st.ok()) {
-        LOG(WARNING) << "fetch table schema failed, errmsg=" << st;
+        result->set_column_nums(col_names.size());
+        for (size_t idx = 0; idx < col_names.size(); ++idx) {
+            result->add_column_names(col_names[idx]);
+        }
+        for (size_t idx = 0; idx < col_types.size(); ++idx) {
+            PTypeDesc* type_desc = result->add_column_types();
+            col_types[idx].to_protobuf(type_desc);
+        }
         st.to_protobuf(result->mutable_status());
-        return;
-    }
-    result->set_column_nums(col_names.size());
-    for (size_t idx = 0; idx < col_names.size(); ++idx) {
-        result->add_column_names(col_names[idx]);
-    }
-    for (size_t idx = 0; idx < col_types.size(); ++idx) {
-        PTypeDesc* type_desc = result->add_column_types();
-        col_types[idx].to_protobuf(type_desc);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
     }
-    st.to_protobuf(result->mutable_status());
 }
 
 Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* 
request,
@@ -435,200 +542,278 @@ void 
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
                                              const PTabletKeyLookupRequest* 
request,
                                              PTabletKeyLookupResponse* 
response,
                                              google::protobuf::Closure* done) {
-    [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-    brpc::ClosureGuard guard(done);
-    Status st = _tablet_fetch_data(request, response);
-    st.to_protobuf(response->mutable_status());
+    bool ret = _heavy_work_pool.offer([this, controller, request, response, 
done]() {
+        [[maybe_unused]] brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
+        brpc::ClosureGuard guard(done);
+        Status st = _tablet_fetch_data(request, response);
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    // PProxyRequest is defined in gensrc/proto/internal_service.proto
-    // Currently it supports 2 kinds of requests:
-    // 1. get all kafka partition ids for given topic
-    // 2. get all kafka partition offsets for given topic and timestamp.
-    if (request->has_kafka_meta_request()) {
-        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-            // get latest offsets for specified partition ids
-            std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()
-                                ->get_kafka_latest_offsets_for_partitions(
-                                        request->kafka_meta_request(), 
&partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+    bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        // PProxyRequest is defined in gensrc/proto/internal_service.proto
+        // Currently it supports 2 kinds of requests:
+        // 1. get all kafka partition ids for given topic
+        // 2. get all kafka partition offsets for given topic and timestamp.
+        if (request->has_kafka_meta_request()) {
+            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+                // get latest offsets for specified partition ids
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_latest_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else if (!kafka_request.offset_times().empty()) {
-            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
-            std::vector<PIntegerPair> partition_offsets;
-            Status st =
-                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                            request->kafka_meta_request(), &partition_offsets);
-            if (st.ok()) {
-                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                for (const auto& entry : partition_offsets) {
-                    PIntegerPair* res = part_offsets->add_offset_times();
-                    res->set_key(entry.key());
-                    res->set_val(entry.val());
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if (!kafka_request.offset_times().empty()) {
+                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_partition_offsets_for_times(
+                                            request->kafka_meta_request(), 
&partition_offsets);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
                 }
-            }
-            st.to_protobuf(response->mutable_status());
-            return;
-        } else {
-            // get partition ids of topic
-            std::vector<int32_t> partition_ids;
-            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                    request->kafka_meta_request(), &partition_ids);
-            if (st.ok()) {
-                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                for (int32_t id : partition_ids) {
-                    kafka_result->add_partition_ids(id);
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else {
+                // get partition ids of topic
+                std::vector<int32_t> partition_ids;
+                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                        request->kafka_meta_request(), &partition_ids);
+                if (st.ok()) {
+                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                    for (int32_t id : partition_ids) {
+                        kafka_result->add_partition_ids(id);
+                    }
                 }
+                st.to_protobuf(response->mutable_status());
+                return;
             }
-            st.to_protobuf(response->mutable_status());
-            return;
         }
+        Status::OK().to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    Status::OK().to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->update(request, response);
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->update(request, response);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->fetch(request, result);
+    bool ret = _heavy_work_pool.offer([this, request, result, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->fetch(request, result);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        result->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    _exec_env->result_cache()->clear(request, response);
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        _exec_env->result_cache()->clear(request, response);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->set_status(PCacheStatus::CANCELED);
+    }
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "merge meet error" << st.to_string();
+    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "merge meet error" << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-    UniqueId unique_id(request->query_id());
-    VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-    if (!st.ok()) {
-        LOG(WARNING) << "apply filter meet error: " << st.to_string();
+    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
+        brpc::ClosureGuard closure_guard(done);
+        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+        UniqueId unique_id(request->query_id());
+        VLOG_NOTICE << "rpc apply_filter recv";
+        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+        if (!st.ok()) {
+            LOG(WARNING) << "apply filter meet error: " << st.to_string();
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        for (int i = 0; i < request->data_size(); ++i) {
-            PDataRow* row = new PDataRow();
-            row->CopyFrom(request->data(i));
-            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
-                                   sizeof(row) + row->ByteSizeLong());
+    bool ret = _heavy_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            for (int i = 0; i < request->data_size(); ++i) {
+                PDataRow* row = new PDataRow();
+                row->CopyFrom(request->data(i));
+                pipe->append_and_flush(reinterpret_cast<char*>(&row), 
sizeof(row),
+                                       sizeof(row) + row->ByteSizeLong());
+            }
+            response->mutable_status()->set_status_code(0);
         }
-        response->mutable_status()->set_status_code(0);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
 void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   const PCommitRequest* request, 
PCommitResult* response,
                                   google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->finish();
-        response->mutable_status()->set_status_code(0);
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            pipe->finish();
+            response->mutable_status()->set_status_code(0);
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
 void PInternalServiceImpl::rollback(google::protobuf::RpcController* 
controller,
                                     const PRollbackRequest* request, 
PRollbackResult* response,
                                     google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    TUniqueId fragment_instance_id;
-    fragment_instance_id.hi = request->fragment_instance_id().hi();
-    fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-    if (pipe == nullptr) {
-        response->mutable_status()->set_status_code(1);
-        response->mutable_status()->add_error_msgs("pipe is null");
-    } else {
-        pipe->cancel("rollback");
-        response->mutable_status()->set_status_code(0);
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        TUniqueId fragment_instance_id;
+        fragment_instance_id.hi = request->fragment_instance_id().hi();
+        fragment_instance_id.lo = request->fragment_instance_id().lo();
+
+        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+        if (pipe == nullptr) {
+            response->mutable_status()->set_status_code(1);
+            response->mutable_status()->add_error_msgs("pipe is null");
+        } else {
+            pipe->cancel("rollback");
+            response->mutable_status()->set_status_code(0);
+        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
controller,
                                               const PConstantExprRequest* 
request,
                                               PConstantExprResult* response,
                                               google::protobuf::Closure* done) 
{
-    brpc::ClosureGuard closure_guard(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-
-    Status st = Status::OK();
-    if (request->has_request()) {
+    bool ret = _light_work_pool.offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = Status::OK();
         st = _fold_constant_expr(request->request(), response);
-    } else {
-        // TODO(yangzhengguo) this is just for compatible with old version, 
this should be removed in the release 0.15
-        st = _fold_constant_expr(cntl->request_attachment().to_string(), 
response);
-    }
-    if (!st.ok()) {
-        LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+        if (!st.ok()) {
+            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st;
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    st.to_protobuf(response->mutable_status());
 }
 
 Status PInternalServiceImpl::_fold_constant_expr(const std::string& 
ser_request,
@@ -643,31 +828,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const 
std::string& ser_request,
     return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
 }
 
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
controller,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
-    // TODO(zxy) delete in 1.2 version
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+    bool ret = _heavy_work_pool.offer([this, controller, request, response, 
done]() {
+        // TODO(zxy) delete in 1.2 version
+        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
 
-    _transmit_block(cntl_base, request, response, new_done, Status::OK());
+        _transmit_block(controller, request, response, new_done, Status::OK());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
-void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
cntl_base,
+void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
controller,
                                                   const PEmptyRequest* request,
                                                   PTransmitDataResult* 
response,
                                                   google::protobuf::Closure* 
done) {
-    PTransmitDataParams* new_request = new PTransmitDataParams();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTransmitDataParams>(new_request, done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
-    _transmit_block(cntl_base, new_request, response, new_done, st);
+    bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
+        PTransmitDataParams* new_request = new PTransmitDataParams();
+        google::protobuf::Closure* new_done =
+                new NewHttpClosure<PTransmitDataParams>(new_request, done);
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        Status st =
+                
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
+        _transmit_block(controller, new_request, response, new_done, st);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
controller,
                                            const PTransmitDataParams* request,
                                            PTransmitDataResult* response,
                                            google::protobuf::Closure* done,
@@ -705,25 +907,34 @@ void 
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
                                              const PCheckRPCChannelRequest* 
request,
                                              PCheckRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->data().size() != request->size()) {
-        std::stringstream ss;
-        ss << "data size not same, expected: " << request->size()
-           << ", actual: " << request->data().size();
-        response->mutable_status()->add_error_msgs(ss.str());
-        response->mutable_status()->set_status_code(1);
-
-    } else {
-        Md5Digest digest;
-        digest.update(static_cast<const void*>(request->data().c_str()), 
request->data().size());
-        digest.digest();
-        if (!iequal(digest.hex(), request->md5())) {
+    bool ret = _light_work_pool.offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(0);
+        if (request->data().size() != request->size()) {
             std::stringstream ss;
-            ss << "md5 not same, expected: " << request->md5() << ", actual: " 
<< digest.hex();
+            ss << "data size not same, expected: " << request->size()
+               << ", actual: " << request->data().size();
             response->mutable_status()->add_error_msgs(ss.str());
             response->mutable_status()->set_status_code(1);
+
+        } else {
+            Md5Digest digest;
+            digest.update(static_cast<const void*>(request->data().c_str()),
+                          request->data().size());
+            digest.digest();
+            if (!iequal(digest.hex(), request->md5())) {
+                std::stringstream ss;
+                ss << "md5 not same, expected: " << request->md5() << ", 
actual: " << digest.hex();
+                response->mutable_status()->add_error_msgs(ss.str());
+                response->mutable_status()->set_status_code(1);
+            }
         }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
@@ -731,44 +942,60 @@ void 
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
                                              const PResetRPCChannelRequest* 
request,
                                              PResetRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(0);
-    if (request->all()) {
-        int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
-        if (size > 0) {
-            std::vector<std::string> endpoints;
-            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
-            ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
-            *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
-        }
-    } else {
-        for (const std::string& endpoint : request->endpoints()) {
-            if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
-                response->mutable_status()->add_error_msgs(endpoint + ": not 
found.");
-                continue;
+    bool ret = _light_work_pool.offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(0);
+        if (request->all()) {
+            int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+            if (size > 0) {
+                std::vector<std::string> endpoints;
+                
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+                ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+                *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
             }
+        } else {
+            for (const std::string& endpoint : request->endpoints()) {
+                if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+                    response->mutable_status()->add_error_msgs(endpoint + ": 
not found.");
+                    continue;
+                }
 
-            if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
-                response->add_channels(endpoint);
-            } else {
-                response->mutable_status()->add_error_msgs(endpoint + ": reset 
failed.");
+                if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+                    response->add_channels(endpoint);
+                } else {
+                    response->mutable_status()->add_error_msgs(endpoint + ": 
reset failed.");
+                }
+            }
+            if (request->endpoints_size() != response->channels_size()) {
+                response->mutable_status()->set_status_code(1);
             }
         }
-        if (request->endpoints_size() != response->channels_size()) {
-            response->mutable_status()->set_status_code(1);
-        }
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
cntl_base,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
controller,
                                       const PHandShakeRequest* request,
                                       PHandShakeResponse* response,
                                       google::protobuf::Closure* done) {
-    brpc::ClosureGuard closure_guard(done);
-    if (request->has_hello()) {
-        response->set_hello(request->hello());
+    bool ret = _light_work_pool.offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        if (request->has_hello()) {
+            response->set_hello(request->hello());
+        }
+        response->mutable_status()->set_status_code(0);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
-    response->mutable_status()->set_status_code(0);
 }
 
 void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -783,8 +1010,8 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
     int64_t brpc_port = request->brpc_port();
     std::string token = request->token();
     int64_t node_id = request->node_id();
-    _slave_replica_worker_pool.offer([rowset_meta_pb, host, brpc_port, 
node_id, segments_size,
-                                      http_port, token, rowset_path, this]() {
+    bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, 
node_id, segments_size,
+                                       http_port, token, rowset_path, this]() {
         TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 rowset_meta_pb.tablet_id(), 
rowset_meta_pb.tablet_schema_hash());
         if (tablet == nullptr) {
@@ -925,6 +1152,12 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
         _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
                                     rowset_meta->tablet_id(), node_id, true);
     });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
     Status::OK().to_protobuf(response->mutable_status());
 }
 
@@ -983,14 +1216,22 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
 void PInternalServiceImpl::response_slave_tablet_pull_rowset(
         google::protobuf::RpcController* controller, const 
PTabletWriteSlaveDoneRequest* request,
         PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* 
done) {
-    brpc::ClosureGuard closure_guard(done);
-    VLOG_CRITICAL
-            << "receive the result of slave replica pull rowset from slave 
replica. slave server="
-            << request->node_id() << ", is_succeed=" << request->is_succeed()
-            << ", tablet_id=" << request->tablet_id() << ", txn_id=" << 
request->txn_id();
-    StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
-            request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
-    Status::OK().to_protobuf(response->mutable_status());
+    bool ret = _heavy_work_pool.offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        VLOG_CRITICAL << "receive the result of slave replica pull rowset from 
slave replica. "
+                         "slave server="
+                      << request->node_id() << ", is_succeed=" << 
request->is_succeed()
+                      << ", tablet_id=" << request->tablet_id() << ", txn_id=" 
<< request->txn_id();
+        
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+                request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
+        Status::OK().to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
 }
 
 static Status read_by_rowids(
@@ -1109,9 +1350,7 @@ void 
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
                                          const PMultiGetRequest* request,
                                          PMultiGetResponse* response,
                                          google::protobuf::Closure* done) {
-    // Submit task to seperate ThreadPool for avoiding block bthread working 
pthread
-    ThreadPool* task_pool = 
StorageEngine::instance()->get_bg_multiget_threadpool();
-    Status submit_st = task_pool->submit_func([request, response, done, 
this]() {
+    bool ret = _heavy_work_pool.offer([request, response, done, this]() {
         // multi get data by rowid
         MonotonicStopWatch watch;
         watch.start();
@@ -1121,9 +1360,11 @@ void 
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
         st.to_protobuf(response->mutable_status());
         LOG(INFO) << "multiget_data finished, cost(us):" << 
watch.elapsed_time() / 1000;
     });
-    if (!submit_st.ok()) {
-        submit_st.to_protobuf(response->mutable_status());
-        done->Run();
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 103293a745..4c500a245f 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -190,8 +190,13 @@ private:
 
 private:
     ExecEnv* _exec_env;
-    PriorityThreadPool _tablet_worker_pool;
-    PriorityThreadPool _slave_replica_worker_pool;
+
+    // every brpc service request should put into thread pool
+    // the reason see issue #16634
+    // define the interface for reading and writing data as heavy interface
+    // otherwise as light interface
+    PriorityThreadPool _heavy_work_pool;
+    PriorityThreadPool _light_work_pool;
 };
 
 } // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 87afed763b..4982862035 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -222,6 +222,16 @@ public:
     IntCounter* upload_rowset_count;
     IntCounter* upload_fail_count;
 
+    UIntGauge* light_work_pool_queue_size;
+    UIntGauge* heavy_work_pool_queue_size;
+    UIntGauge* heavy_work_active_threads;
+    UIntGauge* light_work_active_threads;
+
+    UIntGauge* heavy_work_pool_max_queue_size;
+    UIntGauge* light_work_pool_max_queue_size;
+    UIntGauge* heavy_work_max_threads;
+    UIntGauge* light_work_max_threads;
+
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/util/priority_thread_pool.hpp 
b/be/src/util/priority_thread_pool.hpp
index 3bbc53788c..7e2b8b5f77 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -55,7 +55,7 @@ public:
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const 
std::string& name)
-            : _work_queue(queue_size), _shutdown(false), _name(name) {
+            : _work_queue(queue_size), _shutdown(false), _name(name), 
_active_threads(0) {
         for (int i = 0; i < num_threads; ++i) {
             _threads.create_thread(
                     
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -101,6 +101,7 @@ public:
     virtual void join() { _threads.join_all(); }
 
     virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
+    virtual uint32_t get_active_threads() const { return _active_threads; }
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
@@ -136,7 +137,9 @@ private:
         while (!is_shutdown()) {
             Task task;
             if (_work_queue.blocking_get(&task)) {
+                _active_threads++;
                 task.work_function();
+                _active_threads--;
             }
             if (_work_queue.get_size() == 0) {
                 _empty_cv.notify_all();
@@ -151,6 +154,7 @@ private:
     // Set to true when threads should stop doing work and terminate.
     std::atomic<bool> _shutdown;
     std::string _name;
+    std::atomic<int> _active_threads;
 };
 
 } // namespace doris
diff --git 
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md 
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 3c7b49d6bd..0d542431f3 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json
 |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
 |`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 |
 |`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 |
+|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
+|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | 
+|`doris_be_heavy_work_pool_queue_size`| | Num | brpc 
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_light_work_pool_queue_size`| | Num | brpc 
light线程池队列最大长度,超过则阻塞提交work| | p0 |
+|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
+|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
 
 ### 机器监控
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 984487ebd2..05746715c3 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -276,6 +276,7 @@ enum PCacheStatus {
     INVALID_KEY_RANGE = 6;
     DATA_OVERDUE = 7;
     EMPTY_DATA = 8;
+    CANCELED = 9;
 };
 
 enum CacheType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to