This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a08d51374 [minor](retry) Increase rpc retries in query callback 
(#49635)
72a08d51374 is described below

commit 72a08d51374b1400cf0b842fdb7b5ae101114318
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 31 10:16:12 2025 +0800

    [minor](retry) Increase rpc retries in query callback (#49635)
    
    When query is finished, it will report its state to FE by RPC. But if it
    failed due to network, this query will be considered a failure. This PR
    introduces a retry for this reporting in callback.
---
 be/src/runtime/fragment_mgr.cpp | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 876d221284a..48078411ae7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -371,10 +371,19 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
         // External query (flink/spark read tablets) not need to report to FE.
         return;
     }
+    int callback_retries = 10;
+    const int sleep_ms = 1000;
     Status exec_status = req.status;
     Status coord_status;
-    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
req.coord_addr,
-                                    &coord_status);
+    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
+    do {
+        coord = 
std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
+                                                            req.coord_addr, 
&coord_status);
+        if (!coord_status.ok()) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+        }
+    } while (!coord_status.ok() && callback_retries-- > 0);
+
     if (!coord_status.ok()) {
         std::stringstream ss;
         UniqueId uid(req.query_id.hi, req.query_id.lo);
@@ -570,21 +579,21 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     }
     try {
         try {
-            coord->reportExecStatus(res, params);
+            (*coord)->reportExecStatus(res, params);
         } catch ([[maybe_unused]] TTransportException& e) {
 #ifndef ADDRESS_SANITIZER
             LOG(WARNING) << "Retrying ReportExecStatus. query id: " << 
print_id(req.query_id)
                          << ", instance id: " << 
print_id(req.fragment_instance_id) << " to "
                          << req.coord_addr << ", err: " << e.what();
 #endif
-            rpc_status = coord.reopen();
+            rpc_status = coord->reopen();
 
             if (!rpc_status.ok()) {
                 // we need to cancel the execution of this fragment
                 req.cancel_fn(rpc_status);
                 return;
             }
-            coord->reportExecStatus(res, params);
+            (*coord)->reportExecStatus(res, params);
         }
 
         rpc_status = Status::create<false>(res.status);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to