This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 09827b8e563 [fix](doris catalog) FragmentMgr should not cancel virtual 
doris cluster query (#62135) (#62281)
09827b8e563 is described below

commit 09827b8e56309f3b93e142d92596e7efbd0f36d6
Author: HonestManXin <[email protected]>
AuthorDate: Fri Apr 10 11:00:30 2026 +0800

    [fix](doris catalog) FragmentMgr should not cancel virtual doris cluster 
query (#62135) (#62281)
    
    Cherry-picked from https://github.com/apache/doris/pull/62135
---
 be/src/runtime/fragment_mgr.cpp                    |  18 +-
 be/src/runtime/query_context.cpp                   |   2 +
 be/src/runtime/query_context.h                     |   3 +-
 .../fragment_mgr_cross_cluster_cancel_test.cpp     | 221 +++++++++++++++++++++
 4 files changed, 240 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6bf60dbdfc5..614bd09c871 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -737,9 +737,16 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
 
                         // This may be a first fragment request of the query.
                         // Create the query fragments context.
-                        query_ctx = QueryContext::create(query_id, _exec_env, 
params.query_options,
-                                                         params.coord, 
params.is_nereids,
-                                                         
params.current_connect_fe, query_source);
+                        // Cross-cluster query: coordinator FE may not belong 
to local cluster.
+                        // In that case, cancel_worker() should not cancel it 
based on local FE liveness.
+                        QuerySource actual_query_source = query_source;
+                        if (query_source == QuerySource::INTERNAL_FRONTEND &&
+                            
!_exec_env->get_running_frontends().contains(params.coord)) {
+                            actual_query_source = 
QuerySource::EXTERNAL_FRONTEND;
+                        }
+                        query_ctx = QueryContext::create(
+                                query_id, _exec_env, params.query_options, 
params.coord,
+                                params.is_nereids, params.current_connect_fe, 
actual_query_source);
                         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
                         RETURN_IF_ERROR(DescriptorTbl::create(
                                 &(query_ctx->obj_pool), params.desc_tbl, 
&(query_ctx->desc_tbl)));
@@ -1024,6 +1031,11 @@ void FragmentMgr::cancel_worker() {
                                              -> Status {
                     for (const auto& it : map) {
                         if (auto q_ctx = it.second.lock()) {
+                            // Cross-cluster query: coordinator FE is not in 
local `running_fes`,
+                            // we should not cancel it based on local 
coordinator liveness.
+                            if (q_ctx->get_query_source() == 
QuerySource::EXTERNAL_FRONTEND) {
+                                continue;
+                            }
                             q_contexts.push_back(q_ctx);
                             const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 424085ea1ac..e4bbeef60dd 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -72,6 +72,8 @@ const std::string toString(QuerySource queryType) {
         return "ROUTINE_LOAD";
     case QuerySource::EXTERNAL_CONNECTOR:
         return "EXTERNAL_CONNECTOR";
+    case QuerySource::EXTERNAL_FRONTEND:
+        return "EXTERNAL_FRONTEND";
     default:
         return "UNKNOWN";
     }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 6ec5664f8c0..7ab9d700da2 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -70,7 +70,8 @@ enum class QuerySource {
     STREAM_LOAD,
     GROUP_COMMIT_LOAD,
     ROUTINE_LOAD,
-    EXTERNAL_CONNECTOR
+    EXTERNAL_CONNECTOR,
+    EXTERNAL_FRONTEND
 };
 
 const std::string toString(QuerySource query_source);
diff --git a/be/test/runtime/fragment_mgr_cross_cluster_cancel_test.cpp 
b/be/test/runtime/fragment_mgr_cross_cluster_cancel_test.cpp
new file mode 100644
index 00000000000..669e9da10e4
--- /dev/null
+++ b/be/test/runtime/fragment_mgr_cross_cluster_cancel_test.cpp
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+
+#include "common/config.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/workload_group/workload_group_manager.h"
+
+namespace doris {
+
+class FragmentMgrCrossClusterCancelTest : public testing::Test {
+public:
+    void SetUp() override {
+        _origin_cancel_worker_interval_seconds =
+                config::fragment_mgr_cancel_worker_interval_seconds;
+        // Make cancel_worker run quickly in UT, and restore it in TearDown.
+        config::fragment_mgr_cancel_worker_interval_seconds = 1;
+
+        // Make frontends list deterministic for both this ExecEnv instance 
and global ExecEnv.
+        _origin_global_frontends = ExecEnv::GetInstance()->get_frontends();
+        ExecEnv::GetInstance()->update_frontends({});
+        _exec_env.update_frontends({});
+
+        _exec_env._workload_group_manager = new WorkloadGroupMgr();
+        // Ensure there is a "normal" workload group, otherwise 
WorkloadGroupMgr::get_group() will throw.
+        WorkloadGroupInfo normal_wg_info {.id = 1, .name = "normal"};
+        
_exec_env._workload_group_manager->get_or_create_workload_group(normal_wg_info);
+
+        _exec_env._fragment_mgr = new FragmentMgr(&_exec_env);
+    }
+
+    void TearDown() override {
+        if (_exec_env._fragment_mgr != nullptr) {
+            _exec_env._fragment_mgr->stop();
+        }
+        delete _exec_env._fragment_mgr;
+        _exec_env._fragment_mgr = nullptr;
+        delete _exec_env._workload_group_manager;
+        _exec_env._workload_group_manager = nullptr;
+        _exec_env.update_frontends({});
+
+        ExecEnv::GetInstance()->update_frontends(_origin_global_frontends);
+        config::fragment_mgr_cancel_worker_interval_seconds =
+                _origin_cancel_worker_interval_seconds;
+    }
+
+protected:
+    static TDescriptorTable _make_min_desc_tbl() {
+        TDescriptorTableBuilder dtb;
+        TTupleDescriptorBuilder tuple_builder;
+        tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                       .type(TYPE_INT)
+                                       .nullable(true)
+                                       .column_name("c1")
+                                       .column_pos(1)
+                                       .build());
+        tuple_builder.build(&dtb);
+        return dtb.desc_tbl();
+    }
+
+    static TQueryOptions _make_min_query_options(int64_t fe_process_uuid) {
+        TQueryOptions query_options;
+        query_options.__set_query_type(TQueryType::SELECT);
+        query_options.__set_execution_timeout(60);
+        query_options.__set_query_timeout(60);
+        query_options.__set_mem_limit(64L * 1024 * 1024);
+        query_options.__set_fe_process_uuid(fe_process_uuid);
+        return query_options;
+    }
+
+    ExecEnv _exec_env;
+    int32_t _origin_cancel_worker_interval_seconds = 0;
+    std::vector<TFrontendInfo> _origin_global_frontends;
+};
+
+TEST_F(FragmentMgrCrossClusterCancelTest,
+       MarkQuerySourceAsExternalFrontendWhenCoordinatorNotLocal) {
+    auto* fragment_mgr = _exec_env.fragment_mgr();
+    ASSERT_NE(fragment_mgr, nullptr);
+
+    TUniqueId query_id;
+    query_id.__set_hi(1);
+    query_id.__set_lo(2);
+
+    TNetworkAddress coord;
+    coord.hostname = "fe-a";
+    coord.port = 9030;
+
+    TPipelineFragmentParams params;
+    params.__set_query_id(query_id);
+    params.__set_is_simplified_param(false);
+    params.__set_coord(coord);
+    params.__set_is_nereids(false);
+    params.__set_current_connect_fe(coord);
+    params.__set_fragment_num_on_host(1);
+    params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 
123));
+    params.__set_desc_tbl(_make_min_desc_tbl());
+
+    std::shared_ptr<QueryContext> query_ctx;
+    TPipelineFragmentParamsList parent;
+    auto st = fragment_mgr->_get_or_create_query_ctx(params, parent, 
QuerySource::INTERNAL_FRONTEND,
+                                                     query_ctx);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_NE(query_ctx, nullptr);
+    EXPECT_EQ(query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
+}
+
+TEST_F(FragmentMgrCrossClusterCancelTest, 
CancelWorkerSkipsExternalFrontendQuery) {
+    auto* fragment_mgr = _exec_env.fragment_mgr();
+    ASSERT_NE(fragment_mgr, nullptr);
+
+    // Make global running frontends non-empty so cancel_worker executes the 
invalid-query check path.
+    // NOTE: cancel_worker uses 
ExecEnv::GetInstance()->get_running_frontends().
+    TFrontendInfo global_fe;
+    global_fe.coordinator_address.hostname = "fe-global";
+    global_fe.coordinator_address.port = 9030;
+    global_fe.process_uuid = 777;
+    ExecEnv::GetInstance()->update_frontends({global_fe});
+
+    // Make local running frontends contain `coord_internal` so this query 
remains INTERNAL_FRONTEND.
+    // NOTE: _get_or_create_query_ctx uses 
`_exec_env->get_running_frontends()`.
+    TNetworkAddress coord_internal;
+    coord_internal.hostname = "fe-local";
+    coord_internal.port = 9030;
+
+    TFrontendInfo local_fe;
+    local_fe.coordinator_address = coord_internal;
+    local_fe.process_uuid = 999;
+    _exec_env.update_frontends({local_fe});
+
+    // Create an INTERNAL_FRONTEND query (should be cancelled by cancel_worker 
when coordinator not found).
+    TUniqueId internal_query_id;
+    internal_query_id.__set_hi(3);
+    internal_query_id.__set_lo(4);
+
+    TPipelineFragmentParams internal_params;
+    internal_params.__set_query_id(internal_query_id);
+    internal_params.__set_is_simplified_param(false);
+    internal_params.__set_coord(coord_internal);
+    internal_params.__set_is_nereids(false);
+    internal_params.__set_current_connect_fe(coord_internal);
+    internal_params.__set_fragment_num_on_host(1);
+    
internal_params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 
111));
+    internal_params.__set_desc_tbl(_make_min_desc_tbl());
+
+    std::shared_ptr<QueryContext> internal_query_ctx;
+    TPipelineFragmentParamsList parent;
+    auto st = fragment_mgr->_get_or_create_query_ctx(
+            internal_params, parent, QuerySource::INTERNAL_FRONTEND, 
internal_query_ctx);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_NE(internal_query_ctx, nullptr);
+    ASSERT_EQ(internal_query_ctx->get_query_source(), 
QuerySource::INTERNAL_FRONTEND);
+
+    // Create a cross-cluster query (coordinator not in local frontends), it 
should be marked as EXTERNAL_FRONTEND.
+    TUniqueId external_query_id;
+    external_query_id.__set_hi(5);
+    external_query_id.__set_lo(6);
+
+    TNetworkAddress coord_external;
+    coord_external.hostname = "fe-remote";
+    coord_external.port = 9030;
+
+    TPipelineFragmentParams external_params;
+    external_params.__set_query_id(external_query_id);
+    external_params.__set_is_simplified_param(false);
+    external_params.__set_coord(coord_external);
+    external_params.__set_is_nereids(false);
+    external_params.__set_current_connect_fe(coord_external);
+    external_params.__set_fragment_num_on_host(1);
+    
external_params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 
222));
+    external_params.__set_desc_tbl(_make_min_desc_tbl());
+
+    std::shared_ptr<QueryContext> external_query_ctx;
+    st = fragment_mgr->_get_or_create_query_ctx(external_params, parent,
+                                                
QuerySource::INTERNAL_FRONTEND, external_query_ctx);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_NE(external_query_ctx, nullptr);
+    ASSERT_EQ(external_query_ctx->get_query_source(), 
QuerySource::EXTERNAL_FRONTEND);
+
+    // Wait for background cancel_worker to cancel the INTERNAL_FRONTEND query,
+    // and keep the EXTERNAL_FRONTEND query alive.
+    // NOTE: In BE_TEST, FragmentMgr::remove_query_context() does not erase 
`_query_ctx_map`,
+    // so we validate by `is_cancelled()` instead of expecting get_query_ctx() 
== nullptr.
+    constexpr int kMaxWaitMs = 5000;
+    int waited_ms = 0;
+    while (waited_ms < kMaxWaitMs) {
+        if (internal_query_ctx->is_cancelled()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        waited_ms += 100;
+    }
+
+    EXPECT_TRUE(internal_query_ctx->is_cancelled());
+    EXPECT_FALSE(external_query_ctx->is_cancelled());
+    EXPECT_EQ(external_query_ctx->get_query_source(), 
QuerySource::EXTERNAL_FRONTEND);
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to