This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 613395ee42872830e10a5b2ec355246a8f1b58e9 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Aug 3 22:02:37 2023 +0800 [Chore](brpc) make error messages more verbose when brpc pool offer failed (#22558) --- be/src/service/internal_service.cpp | 72 ++++++++++++++++----------------- be/src/util/blocking_priority_queue.hpp | 2 + be/src/util/priority_thread_pool.hpp | 9 ++++- 3 files changed, 46 insertions(+), 37 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1f62a5bc9a..821e937fca 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -172,17 +172,17 @@ template <typename T> concept CanCancel = requires(T* response) { response->mutable_status(); }; template <CanCancel T> -void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) { +void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) { brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(TStatusCode::CANCELLED); response->mutable_status()->add_error_msgs("fail to offer request to the work pool, pool=" + - pool_name); + pool.get_info()); } template <typename T> -void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) { +void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) { brpc::ClosureGuard closure_guard(done); - LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool_name; + LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); } PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) @@ -272,7 +272,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -284,7 +284,7 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c _exec_plan_fragment_in_pthread(controller, request, response, done); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -320,7 +320,7 @@ void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr _exec_plan_fragment_in_pthread(controller, request, response, done); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -336,7 +336,7 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl st.to_protobuf(result->mutable_status()); }); if (!ret) { - offer_failed(result, done, _light_work_pool.get_name()); + offer_failed(result, done, _light_work_pool); } } @@ -348,7 +348,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll _tablet_writer_add_block(controller, request, response, done); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -369,7 +369,7 @@ void PInternalServiceImpl::tablet_writer_add_block_by_http( } }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -398,7 +398,7 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -419,7 +419,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -496,7 +496,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* st.to_protobuf(result->mutable_status()); }); if (!ret) { - offer_failed(result, done, _light_work_pool.get_name()); + offer_failed(result, done, _light_work_pool); } } @@ -509,7 +509,7 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); }); if (!ret) { - offer_failed(result, done, _heavy_work_pool.get_name()); + offer_failed(result, done, _heavy_work_pool); } } @@ -613,7 +613,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c st.to_protobuf(result->mutable_status()); }); if (!ret) { - offer_failed(result, done, _heavy_work_pool.get_name()); + offer_failed(result, done, _heavy_work_pool); } } @@ -640,7 +640,7 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -652,7 +652,7 @@ void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcCon _get_column_ids_by_tablet_ids(controller, request, response, done); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -768,7 +768,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, Status::OK().to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -780,7 +780,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control _exec_env->result_cache()->update(request, response); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -792,7 +792,7 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll _exec_env->result_cache()->fetch(request, result); }); if (!ret) { - offer_failed(result, done, _heavy_work_pool.get_name()); + offer_failed(result, done, _heavy_work_pool); } } @@ -804,7 +804,7 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controll _exec_env->result_cache()->clear(request, response); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -823,7 +823,7 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -844,7 +844,7 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -865,7 +865,7 @@ void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* con st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -898,7 +898,7 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller } }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -921,7 +921,7 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, } }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -943,7 +943,7 @@ void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, } }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -961,7 +961,7 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c st.to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -988,7 +988,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr _transmit_block(controller, request, response, done, Status::OK()); }); if (!ret) { - offer_failed(response, done, pool.get_name()); + offer_failed(response, done, pool); } } @@ -1006,7 +1006,7 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle _transmit_block(controller, new_request, response, new_done, st); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -1072,7 +1072,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co } }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -1110,7 +1110,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co } }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -1126,7 +1126,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle response->mutable_status()->set_status_code(0); }); if (!ret) { - offer_failed(response, done, _light_work_pool.get_name()); + offer_failed(response, done, _light_work_pool); } } @@ -1344,7 +1344,7 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( rowset_meta->tablet_id(), node_id, true); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } Status::OK().to_protobuf(response->mutable_status()); } @@ -1415,7 +1415,7 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( Status::OK().to_protobuf(response->mutable_status()); }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } @@ -1588,7 +1588,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); if (!ret) { - offer_failed(response, done, _heavy_work_pool.get_name()); + offer_failed(response, done, _heavy_work_pool); } } diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 57e716060f..41196c5cfb 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -185,6 +185,8 @@ public: return _queue.size(); } + uint32_t get_max_size() const { return _max_element; } + // Returns the total amount of time threads have blocked in blocking_get. uint64_t total_get_wait_time() const { return _total_get_wait_time; } diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 73685bcf7c..8f648d9d37 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -122,7 +122,14 @@ public: join(); } - std::string get_name() const { return _name; } + std::string get_info() const { + return fmt::format( + "PriorityThreadPool(name={}, queue_size={}/{}, active_thread={}/{}, " + "total_get_wait_time={}, total_put_wait_time={})", + _name, get_queue_size(), _work_queue.get_size(), _work_queue.get_max_size(), + _active_threads, _threads.size(), _work_queue.total_get_wait_time(), + _work_queue.total_put_wait_time()); + } protected: virtual bool is_shutdown() { return _shutdown; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org