morningman commented on code in PR #16639: URL: https://github.com/apache/doris/pull/16639#discussion_r1103927268
########## be/src/service/internal_service.cpp: ########## @@ -286,125 +334,134 @@ Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, } } -void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* controller, const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); - auto scope = OpentelemetryScope {span}; - brpc::ClosureGuard closure_guard(done); - TUniqueId tid; - tid.__set_hi(request->finst_id().hi()); - tid.__set_lo(request->finst_id().lo()); - - Status st = Status::OK(); - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << request->cancel_reason(); - _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel(tid); - } - - // TODO: the logic seems useless, cancel only return Status::OK. remove it - st.to_protobuf(result->mutable_status()); + DorisMetrics::instance()->cancel_plan_fragment->increment(1); + _light_work_pool.offer([this, controller, request, result, done]() { + auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); + auto scope = OpentelemetryScope {span}; + brpc::ClosureGuard closure_guard(done); + TUniqueId tid; + tid.__set_hi(request->finst_id().hi()); + tid.__set_lo(request->finst_id().lo()); + + Status st = Status::OK(); + if (request->has_cancel_reason()) { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) + << ", reason: " << request->cancel_reason(); + _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + } else { + LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); + _exec_env->fragment_mgr()->cancel(tid); + } + // TODO: the logic seems useless, cancel only return Status::OK. remove it + st.to_protobuf(result->mutable_status()); + }); } -void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); - GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); - _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + DorisMetrics::instance()->fetch_data->increment(1); + _heavy_work_pool.offer([this, controller, request, result, done]() { + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); + _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); + }); } void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, google::protobuf::Closure* done) { - VLOG_RPC << "fetch table schema"; - brpc::ClosureGuard closure_guard(done); - TFileScanRange file_scan_range; - Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); - uint32_t len = request->file_scan_range().size(); - st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + DorisMetrics::instance()->fetch_table_schema->increment(1); + _light_work_pool.offer([request, result, done]() { Review Comment: This is a heavy work, it may read the remote file ########## be/src/service/internal_service.cpp: ########## @@ -423,200 +480,233 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - brpc::ClosureGuard guard(done); - Status st = _tablet_fetch_data(request, response); - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->tablet_fetch_data->increment(1); + _heavy_work_pool.offer([this, controller, request, response, done]() { + [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + brpc::ClosureGuard guard(done); + Status st = _tablet_fetch_data(request, response); + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector<PIntegerPair> partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + DorisMetrics::instance()->get_info->increment(1); + _light_work_pool.offer([this, request, response, done]() { Review Comment: This is a heavy work, it will visit kafka broker, which is a remote service. ########## be/src/service/internal_service.cpp: ########## @@ -423,200 +480,233 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, google::protobuf::Closure* done) { - [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - brpc::ClosureGuard guard(done); - Status st = _tablet_fetch_data(request, response); - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->tablet_fetch_data->increment(1); + _heavy_work_pool.offer([this, controller, request, response, done]() { + [[maybe_unused]] brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + brpc::ClosureGuard guard(done); + Status st = _tablet_fetch_data(request, response); + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - // PProxyRequest is defined in gensrc/proto/internal_service.proto - // Currently it supports 2 kinds of requests: - // 1. get all kafka partition ids for given topic - // 2. get all kafka partition offsets for given topic and timestamp. - if (request->has_kafka_meta_request()) { - const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { - // get latest offsets for specified partition ids - std::vector<PIntegerPair> partition_offsets; - Status st = _exec_env->routine_load_task_executor() - ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + DorisMetrics::instance()->get_info->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + // PProxyRequest is defined in gensrc/proto/internal_service.proto + // Currently it supports 2 kinds of requests: + // 1. get all kafka partition ids for given topic + // 2. get all kafka partition offsets for given topic and timestamp. + if (request->has_kafka_meta_request()) { + const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else if (!kafka_request.offset_times().empty()) { - // if offset_times() has elements, which means this request is to get offset by timestamp. - std::vector<PIntegerPair> partition_offsets; - Status st = - _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); - if (st.ok()) { - PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); - for (const auto& entry : partition_offsets) { - PIntegerPair* res = part_offsets->add_offset_times(); - res->set_key(entry.key()); - res->set_val(entry.val()); + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { + // if offset_times() has elements, which means this request is to get offset by timestamp. + std::vector<PIntegerPair> partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_partition_offsets_for_times( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } } - } - st.to_protobuf(response->mutable_status()); - return; - } else { - // get partition ids of topic - std::vector<int32_t> partition_ids; - Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( - request->kafka_meta_request(), &partition_ids); - if (st.ok()) { - PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); - for (int32_t id : partition_ids) { - kafka_result->add_partition_ids(id); + st.to_protobuf(response->mutable_status()); + return; + } else { + // get partition ids of topic + std::vector<int32_t> partition_ids; + Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta( + request->kafka_meta_request(), &partition_ids); + if (st.ok()) { + PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result(); + for (int32_t id : partition_ids) { + kafka_result->add_partition_ids(id); + } } + st.to_protobuf(response->mutable_status()); + return; } - st.to_protobuf(response->mutable_status()); - return; } - } - Status::OK().to_protobuf(response->mutable_status()); + Status::OK().to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->update(request, response); + DorisMetrics::instance()->update_cache->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); + }); } void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->fetch(request, result); + DorisMetrics::instance()->fetch_cache->increment(1); + _heavy_work_pool.offer([this, request, result, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); + }); } void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - _exec_env->result_cache()->clear(request, response); + DorisMetrics::instance()->clear_cache->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); + }); } void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "merge meet error" << st.to_string(); - } - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->merge_filter->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "merge meet error" << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); - butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); - UniqueId unique_id(request->query_id()); - VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); - if (!st.ok()) { - LOG(WARNING) << "apply filter meet error: " << st.to_string(); - } - st.to_protobuf(response->mutable_status()); + DorisMetrics::instance()->apply_filter->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filter recv"; + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); } void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); - row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + DorisMetrics::instance()->send_data->increment(1); + _heavy_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + for (int i = 0; i < request->data_size(); ++i) { + PDataRow* row = new PDataRow(); + row->CopyFrom(request->data(i)); + pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), + sizeof(row) + row->ByteSizeLong()); + } + response->mutable_status()->set_status_code(0); } - response->mutable_status()->set_status_code(0); - } + }); } void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->finish(); - response->mutable_status()->set_status_code(0); - } + DorisMetrics::instance()->commit->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->finish(); + response->mutable_status()->set_status_code(0); + } + }); } void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - TUniqueId fragment_instance_id; - fragment_instance_id.hi = request->fragment_instance_id().hi(); - fragment_instance_id.lo = request->fragment_instance_id().lo(); - - auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); - if (pipe == nullptr) { - response->mutable_status()->set_status_code(1); - response->mutable_status()->add_error_msgs("pipe is null"); - } else { - pipe->cancel("rollback"); - response->mutable_status()->set_status_code(0); - } + DorisMetrics::instance()->rollback->increment(1); + _light_work_pool.offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + TUniqueId fragment_instance_id; + fragment_instance_id.hi = request->fragment_instance_id().hi(); + fragment_instance_id.lo = request->fragment_instance_id().lo(); + + auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id); + if (pipe == nullptr) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs("pipe is null"); + } else { + pipe->cancel("rollback"); + response->mutable_status()->set_status_code(0); + } + }); } -void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* controller, const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + DorisMetrics::instance()->fold_constant_expr->increment(1); + _light_work_pool.offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - Status st = Status::OK(); - if (request->has_request()) { - st = _fold_constant_expr(request->request(), response); - } else { - // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 - st = _fold_constant_expr(cntl->request_attachment().to_string(), response); - } - if (!st.ok()) { - LOG(WARNING) << "exec fold constant expr failed, errmsg=" << st; - } - st.to_protobuf(response->mutable_status()); + Status st = Status::OK(); + if (request->has_request()) { + st = _fold_constant_expr(request->request(), response); + } else { + // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15 Review Comment: Remove this TODO? ########## be/src/service/internal_service.cpp: ########## @@ -1097,9 +1208,8 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) { - // Submit task to seperate ThreadPool for avoiding block bthread working pthread - ThreadPool* task_pool = StorageEngine::instance()->get_bg_multiget_threadpool(); - Status submit_st = task_pool->submit_func([request, response, done, this]() { + DorisMetrics::instance()->multiget_data->increment(1); + _light_work_pool.offer([request, response, done, this]() { Review Comment: This is a heavy work, because it will open and read segment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org