This is an automated email from the ASF dual-hosted git repository. linzhongcheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0e3be4eff5 [Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex block brpc::bthread (#16639) 0e3be4eff5 is described below commit 0e3be4eff59b8e4e3d21791a8bb7e8cb496d2f2f Author: chenlinzhong <490103...@qq.com> AuthorDate: Wed Feb 22 14:15:47 2023 +0800 [Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex block brpc::bthread (#16639) mainly include: - brpc service adds two types of thread pools. The number of "light" and "heavy" thread pools is different Classify the interfaces of be. Those related to data transmission are classified as heavy interfaces and others as light interfaces - Add some monitoring to the thread pool, including the queue size and the number of active threads. Use these - indicators to guide the configuration of the number of threads --- be/src/common/config.h | 14 +- be/src/service/internal_service.cpp | 953 +++++++++++++-------- be/src/service/internal_service.h | 9 +- be/src/util/doris_metrics.h | 10 + be/src/util/priority_thread_pool.hpp | 6 +- .../maint-monitor/monitor-metrics/metrics.md | 6 + gensrc/proto/internal_service.proto | 1 + 7 files changed, 637 insertions(+), 362 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8516ea4fa9..f15d30bc7d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060"); // port for brpc CONF_Int32(brpc_port, "8060"); -// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores +// the number of bthreads for brpc, the default value is set to -1, +// which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1"); // port to brpc server for single replica load @@ -388,8 +389,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64"); CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); -CONF_Int32(number_tablet_writer_threads, "16"); -CONF_Int32(number_slave_replica_download_threads, "64"); + +// be brpc interface is classified into two categories: light and heavy +// each category has diffrent thread number +// threads to handle heavy api interface, such as transmit_data/transmit_block etc +CONF_Int32(brpc_heavy_work_pool_threads, "192"); +// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start +CONF_Int32(brpc_light_work_pool_threads, "32"); +CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240"); +CONF_Int32(brpc_light_work_pool_max_queue_size, "10240"); // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a8ed5ecfd8..3b271e4152 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -70,7 +70,15 @@ using namespace ErrorCode; const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, MetricUnit::NOUNIT); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT); bthread_key_t btls_key; @@ -104,32 +112,58 @@ private: PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), - _tablet_worker_pool(config::number_tablet_writer_threads, 10240, "tablet_writer"), - _slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240, - "replica_download") { - REGISTER_HOOK_METRIC(add_batch_task_queue_size, - [this]() { return _tablet_worker_pool.get_queue_size(); }); + _heavy_work_pool(config::brpc_heavy_work_pool_threads, + config::brpc_heavy_work_pool_max_queue_size, "brpc_heavy"), + _light_work_pool(config::brpc_light_work_pool_threads, + config::brpc_light_work_pool_max_queue_size, "brpc_light") { + REGISTER_HOOK_METRIC(heavy_work_pool_queue_size, + [this]() { return _heavy_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(light_work_pool_queue_size, + [this]() { return _light_work_pool.get_queue_size(); }); + REGISTER_HOOK_METRIC(heavy_work_active_threads, + [this]() { return _heavy_work_pool.get_active_threads(); }); + REGISTER_HOOK_METRIC(light_work_active_threads, + [this]() { return _light_work_pool.get_active_threads(); }); + + REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size, + []() { return config::brpc_heavy_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(light_work_pool_max_queue_size, + []() { return config::brpc_light_work_pool_max_queue_size; }); + REGISTER_HOOK_METRIC(heavy_work_max_threads, + []() { return config::brpc_heavy_work_pool_threads; }); + REGISTER_HOOK_METRIC(light_work_max_threads, + []() { return config::brpc_light_work_pool_threads; }); + CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter)); } PInternalServiceImpl::~PInternalServiceImpl() { - DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_active_threads); + DEREGISTER_HOOK_METRIC(light_work_active_threads); + + DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size); + DEREGISTER_HOOK_METRIC(heavy_work_max_threads); + DEREGISTER_HOOK_METRIC(light_work_max_threads); + CHECK_EQ(0, bthread_key_delete(btls_key)); CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key)); } -void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) {} -void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -139,22 +173,31 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() - << ", txn_id=" << request->txn_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->open(*request); - if (!st.ok()) { - LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() - << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + bool ret = _light_work_pool.offer([this, request, response, done]() { + VLOG_RPC << "tablet writer open, id=" << request->id() + << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->open(*request); + if (!st.ok()) { + LOG(WARNING) << "load channel open failed, message=" << st << ", id=" << request->id() + << ", index_id=" << request->index_id() + << ", txn_id=" << request->txn_id(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); + auto span = telemetry::start_rpc_server_span("exec_plan_fragment", controller); auto scope = OpentelemetryScope {span}; brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -168,67 +211,95 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); } -void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - exec_plan_fragment(cntl_base, request, response, done); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + exec_plan_fragment(controller, request, response, done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* controller, const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->start_query_execution(request); - st.to_protobuf(result->mutable_status()); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->fragment_mgr()->start_query_execution(request); + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); - _tablet_writer_add_block(cntl_base, request, response, new_done); + _tablet_writer_add_block(controller, request, response, new_done); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_add_block_by_http( - google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, + google::protobuf::RpcController* controller, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* new_done = - new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request, - cntl); - if (st.ok()) { - _tablet_writer_add_block(cntl_base, new_request, response, new_done); - } else { - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>( + new_request, cntl); + if (st.ok()) { + _tablet_writer_add_block(controller, new_request, response, new_done); + } else { + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* controller, const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer add block, id=" << request->id() - << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() - << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); int64_t submit_task_time_ns = MonotonicNanos(); - _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, submit_task_time_ns, this]() { int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; brpc::ClosureGuard closure_guard(done); int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { LOG(WARNING) << "tablet writer add block failed, message=" << st @@ -241,20 +312,32 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->load_channel_mgr()->cancel(*request); - if (!st.ok()) { - LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + bool ret = _light_work_pool.offer([this, request, done]() { + VLOG_RPC << "tablet writer cancel, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); + brpc::ClosureGuard closure_guard(done); + auto st = _exec_env->load_channel_mgr()->cancel(*request); + if (!st.ok()) { + LOG(WARNING) << "tablet writer cancel failed, id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); } } @@ -298,125 +381,149 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st = Status::OK(); - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel(tid); + bool ret = _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - - // TODO: the logic seems useless, cancel only return Status::OK. remove it - st.to_protobuf(result->mutable_status()); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + bool ret = _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + bool ret = _heavy_work_pool.offer([request, result, done]() { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + + std::unique_ptr<vectorized::GenericReader> reader(nullptr); + std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); + IOContext io_ctx; + FileCacheStatistics file_cache_statis; + io_ctx.file_cache_stats = &file_cache_statis; + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + // file_slots is no use + std::vector<SlotDescriptor*> file_slots; + reader.reset( + new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_PARQUET: { + reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); + break; + } + case TFileFormatType::FORMAT_ORC: { + std::vector<std::string> column_names; + reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); + break; + } + case TFileFormatType::FORMAT_JSON: { + std::vector<SlotDescriptor*> file_slots; + reader.reset(new vectorized::NewJsonReader(profile.get(), params, range, file_slots, + &io_ctx)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::vector<std::string> col_names; + std::vector<TypeDescriptor> col_types; + st = reader->get_parsed_schema(&col_names, &col_types); if (!st.ok()) { LOG(WARNING) << "fetch table schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } - } - if (file_scan_range.__isset.ranges == false) { - st = Status::InternalError("can not get TFileRangeDesc."); - st.to_protobuf(result->mutable_status()); - return; - } - if (file_scan_range.__isset.params == false) { - st = Status::InternalError("can not get TFileScanRangeParams."); - st.to_protobuf(result->mutable_status()); - return; - } - const TFileRangeDesc& range = file_scan_range.ranges.at(0); - const TFileScanRangeParams& params = file_scan_range.params; - - std::unique_ptr<vectorized::GenericReader> reader(nullptr); - std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); - IOContext io_ctx; - FileCacheStatistics file_cache_statis; - io_ctx.file_cache_stats = &file_cache_statis; - switch (params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_CSV_DEFLATE: { - // file_slots is no use - std::vector<SlotDescriptor*> file_slots; - reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); - break; - } - case TFileFormatType::FORMAT_ORC: { - std::vector<std::string> column_names; - reader.reset(new vectorized::OrcReader(params, range, column_names, "", &io_ctx)); - break; - } - case TFileFormatType::FORMAT_JSON: { - std::vector<SlotDescriptor*> file_slots; - reader.reset( - new vectorized::NewJsonReader(profile.get(), params, range, file_slots, &io_ctx)); - break; - } - default: - st = Status::InternalError("Not supported file format in fetch table schema: {}", - params.format_type); - st.to_protobuf(result->mutable_status()); - return; - } - std::vector<std::string> col_names; - std::vector<TypeDescriptor> col_types; - st = reader->get_parsed_schema(&col_names, &col_types); - if (!st.ok()) { - LOG(WARNING) << "fetch table schema failed, errmsg=" << st; + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } st.to_protobuf(result->mutable_status()); - return; - } - result->set_column_nums(col_names.size()); - for (size_t idx = 0; idx < col_names.size(); ++idx) { - result->add_column_names(col_names[idx]); - } - for (size_t idx = 0; idx < col_types.size(); ++idx) { - PTypeDesc* type_desc = result->add_column_types(); - col_types[idx].to_protobuf(type_desc); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->mutable_status()->set_status_code(TStatusCode::CANCELLED); + result->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(result->mutable_status()); } Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, @@ -435,200 +542,278 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - brpc::ClosureGuard guard(done); - Status st = _tablet_fetch_data(request, response); - st.to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + brpc::ClosureGuard guard(done); + Status st = _tablet_fetch_data(request, response); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector<PIntegerPair> partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector<PIntegerPair> partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector<int32_t> partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector<int32_t> partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - Status::OK().to_protobuf(response->mutable_status()); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + bool ret = _heavy_work_pool.offer([this, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + result->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->set_status(PCacheStatus::CANCELED); + } } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); + bool ret = _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); - row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + bool ret = _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), + sizeof(row) + row->ByteSizeLong()); + } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - - Status st = Status::OK(); - if (request->has_request()) { + bool ret = _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + if (!st.ok()) { + LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - st.to_protobuf(response->mutable_status()); } Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, @@ -643,31 +828,48 @@ Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - attachment_transfer_request_block<PTransmitDataParams>(request, cntl); + bool ret = _heavy_work_pool.offer([this, controller, request, response, done]() { + // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + attachment_transfer_request_block<PTransmitDataParams>(request, cntl); - _transmit_block(cntl_base, request, response, new_done, Status::OK()); + _transmit_block(controller, request, response, new_done, Status::OK()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* controller, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* new_request = new PTransmitDataParams(); - google::protobuf::Closure* new_done = - new NewHttpClosure<PTransmitDataParams>(new_request, done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - Status st = attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl); - _transmit_block(cntl_base, new_request, response, new_done, st); + bool ret = _heavy_work_pool.offer([this, controller, response, done]() { + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = + new NewHttpClosure<PTransmitDataParams>(new_request, done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + Status st = + attachment_extract_request_contain_block<PTransmitDataParams>(new_request, cntl); + _transmit_block(controller, new_request, response, new_done, st); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done, @@ -705,25 +907,34 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->data().size() != request->size()) { - std::stringstream ss; - ss << "data size not same, expected: " << request->size() - << ", actual: " << request->data().size(); - response->mutable_status()->add_error_msgs(ss.str()); - response->mutable_status()->set_status_code(1); - - } else { - Md5Digest digest; - digest.update(static_cast<const void*>(request->data().c_str()), request->data().size()); - digest.digest(); - if (!iequal(digest.hex(), request->md5())) { + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->data().size() != request->size()) { std::stringstream ss; - ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + ss << "data size not same, expected: " << request->size() + << ", actual: " << request->data().size(); response->mutable_status()->add_error_msgs(ss.str()); response->mutable_status()->set_status_code(1); + + } else { + Md5Digest digest; + digest.update(static_cast<const void*>(request->data().c_str()), + request->data().size()); + digest.digest(); + if (!iequal(digest.hex(), request->md5())) { + std::stringstream ss; + ss << "md5 not same, expected: " << request->md5() << ", actual: " << digest.hex(); + response->mutable_status()->add_error_msgs(ss.str()); + response->mutable_status()->set_status_code(1); + } } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } @@ -731,44 +942,60 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - response->mutable_status()->set_status_code(0); - if (request->all()) { - int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); - if (size > 0) { - std::vector<std::string> endpoints; - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); - ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); - *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; - } - } else { - for (const std::string& endpoint : request->endpoints()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { - response->mutable_status()->add_error_msgs(endpoint + ": not found."); - continue; + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + if (request->all()) { + int size = ExecEnv::GetInstance()->brpc_internal_client_cache()->size(); + if (size > 0) { + std::vector<std::string> endpoints; + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints); + ExecEnv::GetInstance()->brpc_internal_client_cache()->clear(); + *response->mutable_channels() = {endpoints.begin(), endpoints.end()}; } + } else { + for (const std::string& endpoint : request->endpoints()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) { + response->mutable_status()->add_error_msgs(endpoint + ": not found."); + continue; + } - if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { - response->add_channels(endpoint); - } else { - response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + if (ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) { + response->add_channels(endpoint); + } else { + response->mutable_status()->add_error_msgs(endpoint + ": reset failed."); + } + } + if (request->endpoints_size() != response->channels_size()) { + response->mutable_status()->set_status_code(1); } } - if (request->endpoints_size() != response->channels_size()) { - response->mutable_status()->set_status_code(1); - } + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } -void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); + bool ret = _light_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); + } + response->mutable_status()->set_status_code(0); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } - response->mutable_status()->set_status_code(0); } void PInternalServiceImpl::request_slave_tablet_pull_rowset( @@ -783,8 +1010,8 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( int64_t brpc_port = request->brpc_port(); std::string token = request->token(); int64_t node_id = request->node_id(); - _slave_replica_worker_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, - http_port, token, rowset_path, this]() { + bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, node_id, segments_size, + http_port, token, rowset_path, this]() { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( rowset_meta_pb.tablet_id(), rowset_meta_pb.tablet_schema_hash()); if (tablet == nullptr) { @@ -925,6 +1152,12 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(), rowset_meta->tablet_id(), node_id, true); }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } Status::OK().to_protobuf(response->mutable_status()); } @@ -983,14 +1216,22 @@ void PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote void PInternalServiceImpl::response_slave_tablet_pull_rowset( google::protobuf::RpcController* controller, const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - VLOG_CRITICAL - << "receive the result of slave replica pull rowset from slave replica. slave server=" - << request->node_id() << ", is_succeed=" << request->is_succeed() - << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); - StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( - request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); - Status::OK().to_protobuf(response->mutable_status()); + bool ret = _heavy_work_pool.offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + VLOG_CRITICAL << "receive the result of slave replica pull rowset from slave replica. " + "slave server=" + << request->node_id() << ", is_succeed=" << request->is_succeed() + << ", tablet_id=" << request->tablet_id() << ", txn_id=" << request->txn_id(); + StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset( + request->txn_id(), request->tablet_id(), request->node_id(), request->is_succeed()); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } } static Status read_by_rowids( @@ -1109,9 +1350,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - // Submit task to seperate ThreadPool for avoiding block bthread working pthread - ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool(); - Status submit_st = task_pool->submit_func([request, response, done, this]() { + bool ret = _heavy_work_pool.offer([request, response, done, this]() { // multi get data by rowid MonotonicStopWatch watch; watch.start(); @@ -1121,9 +1360,11 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro st.to_protobuf(response->mutable_status()); LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); - if (!submit_st.ok()) { - submit_st.to_protobuf(response->mutable_status()); - done->Run(); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); } } diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 103293a745..4c500a245f 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -190,8 +190,13 @@ private: private: ExecEnv* _exec_env; - PriorityThreadPool _tablet_worker_pool; - PriorityThreadPool _slave_replica_worker_pool; + + // every brpc service request should put into thread pool + // the reason see issue #16634 + // define the interface for reading and writing data as heavy interface + // otherwise as light interface + PriorityThreadPool _heavy_work_pool; + PriorityThreadPool _light_work_pool; }; } // namespace doris diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 87afed763b..4982862035 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -222,6 +222,16 @@ public: IntCounter* upload_rowset_count; IntCounter* upload_fail_count; + UIntGauge* light_work_pool_queue_size; + UIntGauge* heavy_work_pool_queue_size; + UIntGauge* heavy_work_active_threads; + UIntGauge* light_work_active_threads; + + UIntGauge* heavy_work_pool_max_queue_size; + UIntGauge* light_work_pool_max_queue_size; + UIntGauge* heavy_work_max_threads; + UIntGauge* light_work_max_threads; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 3bbc53788c..7e2b8b5f77 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -55,7 +55,7 @@ public: // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name) - : _work_queue(queue_size), _shutdown(false), _name(name) { + : _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) { for (int i = 0; i < num_threads; ++i) { _threads.create_thread( std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i)); @@ -101,6 +101,7 @@ public: virtual void join() { _threads.join_all(); } virtual uint32_t get_queue_size() const { return _work_queue.get_size(); } + virtual uint32_t get_active_threads() const { return _active_threads; } // Blocks until the work queue is empty, and then calls shutdown to stop the worker // threads and Join to wait until they are finished. @@ -136,7 +137,9 @@ private: while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { + _active_threads++; task.work_function(); + _active_threads--; } if (_work_queue.get_size() == 0) { _empty_cv.notify_all(); @@ -151,6 +154,7 @@ private: // Set to true when threads should stop doing work and terminate. std::atomic<bool> _shutdown; std::string _name; + std::atomic<int> _active_threads; }; } // namespace doris diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 3c7b49d6bd..0d542431f3 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | |`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 | |`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 | +|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 | +|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | +|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 | +|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 | +|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 | ### 机器监控 diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 984487ebd2..05746715c3 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -276,6 +276,7 @@ enum PCacheStatus { INVALID_KEY_RANGE = 6; DATA_OVERDUE = 7; EMPTY_DATA = 8; + CANCELED = 9; }; enum CacheType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org