This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 035027f8315 [fix](query cancel) Fix query is cancelled when it comes 
from follower FE #37662 (#37707)
035027f8315 is described below

commit 035027f8315ff26a1c5168e71f43fb26f14edc68
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Fri Jul 12 15:50:45 2024 +0800

    [fix](query cancel) Fix query is cancelled when it comes from follower FE 
#37662 (#37707)
    
    cherry pick from #37662
---
 be/src/runtime/fragment_mgr.cpp | 42 +++++++++++++++++++++++++++++++++++------
 1 file changed, 36 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fee9d51afba..c2f16e1d05a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -37,6 +37,7 @@
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TTransportException.h>
 
+#include <algorithm>
 #include <atomic>
 
 #include "common/status.h"
@@ -1173,21 +1174,50 @@ void FragmentMgr::cancel_worker() {
                         continue;
                     }
 
-                    auto itr = running_fes.find(q.second->coord_addr);
+                    auto query_context = q.second;
+
+                    auto itr = running_fes.find(query_context->coord_addr);
                     if (itr != running_fes.end()) {
-                        if (q.second->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
+                        if (query_context->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
                             itr->second.info.process_uuid == 0) {
                             continue;
                         } else {
-                            LOG_WARNING("Coordinator of query {} restarted, 
going to cancel it.",
-                                        print_id(q.second->query_id()));
+                            // In some rear cases, the rpc port of follower is 
not updated in time,
+                            // then the port of this follower will be zero, 
but acutally it is still running,
+                            // and be has already received the query from 
follower.
+                            // So we need to check if host is in running_fes.
+                            bool fe_host_is_standing = std::any_of(
+                                    running_fes.begin(), running_fes.end(),
+                                    [query_context](const auto& fe) {
+                                        return fe.first.hostname ==
+                                                       
query_context->coord_addr.hostname &&
+                                               fe.first.port == 0;
+                                    });
+                            if (fe_host_is_standing) {
+                                LOG_WARNING(
+                                        "Coordinator {}:{} is not found, but 
its host is still "
+                                        "running with an unstable brpc port, 
not going to cancel "
+                                        "it.",
+                                        query_context->coord_addr.hostname,
+                                        query_context->coord_addr.port,
+                                        print_id(query_context->query_id()));
+                                continue;
+                            } else {
+                                LOG_WARNING(
+                                        "Could not find target coordinator 
{}:{} of query {}, "
+                                        "going to "
+                                        "cancel it.",
+                                        query_context->coord_addr.hostname,
+                                        query_context->coord_addr.port,
+                                        print_id(query_context->query_id()));
+                            }
                         }
                     } else {
                         LOG_WARNING(
                                 "Could not find target coordinator {}:{} of 
query {}, going to "
                                 "cancel it.",
-                                q.second->coord_addr.hostname, 
q.second->coord_addr.port,
-                                print_id(q.second->query_id()));
+                                query_context->coord_addr.hostname, 
query_context->coord_addr.port,
+                                print_id(query_context->query_id()));
                     }
 
                     // Coorninator of this query has already dead.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to