This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 72a08d51374 [minor](retry) Increase rpc retries in query callback (#49635) 72a08d51374 is described below commit 72a08d51374b1400cf0b842fdb7b5ae101114318 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Mar 31 10:16:12 2025 +0800 [minor](retry) Increase rpc retries in query callback (#49635) When query is finished, it will report its state to FE by RPC. But if it failed due to network, this query will be considered a failure. This PR introduces a retry for this reporting in callback. --- be/src/runtime/fragment_mgr.cpp | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 876d221284a..48078411ae7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -371,10 +371,19 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { // External query (flink/spark read tablets) not need to report to FE. return; } + int callback_retries = 10; + const int sleep_ms = 1000; Status exec_status = req.status; Status coord_status; - FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, - &coord_status); + std::unique_ptr<FrontendServiceConnection> coord = nullptr; + do { + coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(), + req.coord_addr, &coord_status); + if (!coord_status.ok()) { + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + } while (!coord_status.ok() && callback_retries-- > 0); + if (!coord_status.ok()) { std::stringstream ss; UniqueId uid(req.query_id.hi, req.query_id.lo); @@ -570,21 +579,21 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } try { try { - coord->reportExecStatus(res, params); + (*coord)->reportExecStatus(res, params); } catch ([[maybe_unused]] TTransportException& e) { #ifndef ADDRESS_SANITIZER LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id) << ", instance id: " << print_id(req.fragment_instance_id) << " to " << req.coord_addr << ", err: " << e.what(); #endif - rpc_status = coord.reopen(); + rpc_status = coord->reopen(); if (!rpc_status.ok()) { // we need to cancel the execution of this fragment req.cancel_fn(rpc_status); return; } - coord->reportExecStatus(res, params); + (*coord)->reportExecStatus(res, params); } rpc_status = Status::create<false>(res.status); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org