This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c083336bbe [Improvement](pipeline) Cancel outdated query if original 
fe restarts (#23582)
c083336bbe is described below

commit c083336bbee1ab2a07a544e5f25a226cd5c28302
Author: hzq <seuhezhiqi...@163.com>
AuthorDate: Thu Aug 31 17:58:52 2023 +0800

    [Improvement](pipeline) Cancel outdated query if original fe restarts 
(#23582)
    
    If any FE restarts, queries that is emitted from this FE will be cancelled.
    
    Implementation of #23704
---
 be/src/agent/heartbeat_server.cpp                  |   8 +-
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   4 +
 be/src/pipeline/pipeline_fragment_context.cpp      |   9 +-
 be/src/pipeline/pipeline_fragment_context.h        |   2 +
 be/src/pipeline/task_scheduler.cpp                 |   8 +-
 be/src/runtime/exec_env.cpp                        | 105 ++++++++++++
 be/src/runtime/exec_env.h                          |  10 ++
 be/src/runtime/external_scan_context_mgr.cpp       |   6 +-
 be/src/runtime/fragment_mgr.cpp                    | 180 +++++++++++++++------
 be/src/runtime/fragment_mgr.h                      |  31 +++-
 be/src/runtime/{exec_env.cpp => frontend_info.h}   |  15 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp   |   2 +-
 be/src/runtime/query_context.h                     |   2 +
 be/src/runtime/runtime_state.h                     |   5 +-
 be/src/service/backend_service.cpp                 |   3 +-
 be/src/service/internal_service.cpp                |   5 +-
 be/src/util/debug_util.cpp                         |  33 ++++
 be/src/util/debug_util.h                           |   9 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  14 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |   2 +
 .../java/org/apache/doris/service/ExecuteEnv.java  |   8 +-
 .../apache/doris/service/FrontendServiceImpl.java  |   3 +-
 .../java/org/apache/doris/system/Frontend.java     |  14 +-
 .../apache/doris/system/FrontendHbResponse.java    |  14 +-
 .../java/org/apache/doris/system/HeartbeatMgr.java |  13 +-
 gensrc/thrift/HeartbeatService.thrift              |   6 +
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 28 files changed, 423 insertions(+), 93 deletions(-)

diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index 855be182ff..71e8b0adcb 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -62,6 +62,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& 
heartbeat_result,
                           << "host:" << master_info.network_address.hostname
                           << ", port:" << master_info.network_address.port
                           << ", cluster id:" << master_info.cluster_id
+                          << ", frontend_info:" << 
PrintFrontendInfos(master_info.frontend_infos)
                           << ", counter:" << google::COUNTER << ", BE start 
time: " << _be_epoch;
 
     MonotonicStopWatch watch;
@@ -97,7 +98,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
         _master_info->cluster_id = master_info.cluster_id;
         LOG(INFO) << "record cluster id. host: " << 
master_info.network_address.hostname
                   << ". port: " << master_info.network_address.port
-                  << ". cluster id: " << master_info.cluster_id;
+                  << ". cluster id: " << master_info.cluster_id
+                  << ". frontend_infos: " << 
PrintFrontendInfos(master_info.frontend_infos);
     } else {
         if (_master_info->cluster_id != master_info.cluster_id) {
             return Status::InternalError("invalid cluster id. ignore. 
cluster_id={}",
@@ -210,6 +212,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& 
master_info) {
         _master_info->__set_backend_id(master_info.backend_id);
     }
 
+    if (master_info.__isset.frontend_infos) {
+        ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
+    }
+
     if (need_report) {
         LOG(INFO) << "Master FE is changed or restarted. report tablet and 
disk info immediately";
         _olap_engine->notify_listeners();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 3ada8cb82f..873db8fd67 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1078,6 +1078,8 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
 
 DEFINE_Int32(partition_topn_partition_threshold, "1024");
 
+DEFINE_Int32(fe_expire_duration_seconds, "60");
+
 #ifdef BE_TEST
 // test s3
 DEFINE_String(test_s3_resource, "resource");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index beca3957ef..9c452c8cfc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1137,6 +1137,10 @@ DECLARE_mString(user_files_secure_path);
 // and if this threshold is exceeded, the remaining data will be pass through 
to other node directly.
 DECLARE_Int32(partition_topn_partition_threshold);
 
+// If fe's frontend info has not been updated for more than 
fe_expire_duration_seconds, it will be regarded
+// as an abnormal fe, this will cause be to cancel this fe's related query.
+DECLARE_Int32(fe_expire_duration_seconds);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 10a6518ff1..97689ed001 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -151,7 +151,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
-        LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
+        LOG(WARNING) << "PipelineFragmentContext "
+                     << PrintInstanceStandardInfo(_query_id, _fragment_id, 
_fragment_instance_id)
+                     << " is canceled, cancel message: " << msg;
+
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
         auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
@@ -194,8 +197,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     }
 
     LOG_INFO("PipelineFragmentContext::prepare")
-            .tag("query_id", _query_id)
-            .tag("instance_id", local_params.fragment_instance_id)
+            .tag("query_id", print_id(_query_id))
+            .tag("instance_id", print_id(local_params.fragment_instance_id))
             .tag("backend_num", local_params.backend_num)
             .tag("pthread_id", (uintptr_t)pthread_self());
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index de3451d11a..4b35c206e5 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -101,6 +101,8 @@ public:
 
     TUniqueId get_query_id() const { return _query_id; }
 
+    [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
+
     void close_a_pipeline();
 
     std::string to_http_path(const std::string& file_name);
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 11d20ac7a9..8b2f8e3f0c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -36,6 +36,7 @@
 #include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
 #include "runtime/query_context.h"
+#include "util/debug_util.h"
 #include "util/sse_util.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
@@ -269,7 +270,12 @@ void TaskScheduler::_do_work(size_t index) {
         task->set_previous_core_id(index);
         if (!status.ok()) {
             task->set_eos_time();
-            LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}", 
status.to_string());
+            LOG(WARNING) << fmt::format(
+                    "Pipeline task failed. query_id: {} reason: {}",
+                    
PrintInstanceStandardInfo(task->query_context()->query_id(),
+                                              
task->fragment_context()->get_fragment_id(),
+                                              
task->fragment_context()->get_fragment_instance_id()),
+                    status.to_string());
             // Print detail informations below when you debugging here.
             //
             // LOG(WARNING)<< "task:\n"<<task->debug_string();
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8f63d6fe6a..333ba4f97a 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -19,6 +19,15 @@
 
 #include <gen_cpp/HeartbeatService_types.h>
 
+#include <mutex>
+#include <utility>
+
+#include "common/config.h"
+#include "runtime/frontend_info.h"
+#include "time.h"
+#include "util/debug_util.h"
+#include "util/time.h"
+
 namespace doris {
 
 ExecEnv::ExecEnv() : _is_init(false) {}
@@ -28,4 +37,100 @@ ExecEnv::~ExecEnv() {}
 const std::string& ExecEnv::token() const {
     return _master_info->token;
 }
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() {
+    std::lock_guard<std::mutex> lg(_frontends_lock);
+    return _frontends;
+}
+
+void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) 
{
+    std::lock_guard<std::mutex> lg(_frontends_lock);
+
+    std::set<TNetworkAddress> dropped_fes;
+
+    for (const auto& cur_fe : _frontends) {
+        dropped_fes.insert(cur_fe.first);
+    }
+
+    for (const auto& coming_fe_info : new_fe_infos) {
+        auto itr = _frontends.find(coming_fe_info.coordinator_address);
+
+        if (itr == _frontends.end()) {
+            LOG(INFO) << "A completely new frontend, " << 
PrintFrontendInfo(coming_fe_info);
+
+            _frontends.insert(std::pair<TNetworkAddress, FrontendInfo>(
+                    coming_fe_info.coordinator_address,
+                    FrontendInfo {coming_fe_info, GetCurrentTimeMicros() / 
1000, /*first time*/
+                                  GetCurrentTimeMicros() / 1000 /*last 
time*/}));
+
+            continue;
+        }
+
+        dropped_fes.erase(coming_fe_info.coordinator_address);
+
+        if (coming_fe_info.process_uuid == 0) {
+            LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info)
+                         << " is in an unknown state.";
+        }
+
+        if (coming_fe_info.process_uuid == itr->second.info.process_uuid) {
+            itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
+            continue;
+        }
+
+        // If we get here, means this frontend has already restarted.
+        itr->second.info.process_uuid = coming_fe_info.process_uuid;
+        itr->second.first_receiving_time_ms = GetCurrentTimeMicros() / 1000;
+        itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
+        LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info);
+    }
+
+    for (const auto& dropped_fe : dropped_fes) {
+        LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe)
+                  << " has already been dropped, remove it";
+        _frontends.erase(dropped_fe);
+    }
+}
+
+std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
+    std::lock_guard<std::mutex> lg(_frontends_lock);
+    std::map<TNetworkAddress, FrontendInfo> res;
+    const int expired_duration = config::fe_expire_duration_seconds * 1000;
+    const auto now = GetCurrentTimeMicros() / 1000;
+
+    for (const auto& pair : _frontends) {
+        if (pair.second.info.process_uuid != 0) {
+            if (now - pair.second.last_reveiving_time_ms < expired_duration) {
+                // If fe info has just been update in last expired_duration, 
regard it as running.
+                res[pair.first] = pair.second;
+            } else {
+                // Fe info has not been udpate for more than expired_duration, 
regard it as an abnormal.
+                // Abnormal means this fe can not connect to master, and it is 
not dropped from cluster.
+                // or fe do not have master yet.
+                LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
+                          << " has not update its hb "
+                          << "for more than " << 
config::fe_expire_duration_seconds
+                          << " secs, regard it as abnormal.";
+            }
+
+            continue;
+        }
+
+        if (pair.second.last_reveiving_time_ms - 
pair.second.first_receiving_time_ms >
+            expired_duration) {
+            // A zero process-uuid that sustains more than 60 seconds(default).
+            // We will regard this fe as a abnormal frontend.
+            LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
+                      << " has not update its hb "
+                      << "for more than " << config::fe_expire_duration_seconds
+                      << " secs, regard it as abnormal.";
+            continue;
+        } else {
+            res[pair.first] = pair.second;
+        }
+    }
+
+    return res;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 18dd926ba6..997be48974 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -17,11 +17,13 @@
 
 #pragma once
 
+#include <gen_cpp/HeartbeatService_types.h>
 #include <stddef.h>
 
 #include <algorithm>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <shared_mutex>
 #include <string>
 #include <unordered_map>
@@ -34,6 +36,7 @@
 #include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace doris {
+struct FrontendInfo;
 namespace vectorized {
 class VDataStreamMgr;
 class ScannerScheduler;
@@ -199,6 +202,10 @@ public:
         this->_stream_load_executor = stream_load_executor;
     }
 
+    void update_frontends(const std::vector<TFrontendInfo>& new_infos);
+    std::map<TNetworkAddress, FrontendInfo> get_frontends();
+    std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
+
 private:
     Status _init(const std::vector<StorePath>& store_paths);
     void _destroy();
@@ -277,6 +284,9 @@ private:
 
     std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
     std::shared_mutex _zone_cache_rw_lock;
+
+    std::mutex _frontends_lock;
+    std::map<TNetworkAddress, FrontendInfo> _frontends;
 };
 
 template <>
diff --git a/be/src/runtime/external_scan_context_mgr.cpp 
b/be/src/runtime/external_scan_context_mgr.cpp
index 9bb095d9a6..2a3dc92521 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -103,7 +103,8 @@ Status ExternalScanContextMgr::clear_scan_context(const 
std::string& context_id)
     }
     if (context != nullptr) {
         // first cancel the fragment instance, just ignore return status
-        _exec_env->fragment_mgr()->cancel(context->fragment_instance_id);
+        
_exec_env->fragment_mgr()->cancel_instance(context->fragment_instance_id,
+                                                   
PPlanFragmentCancelReason::INTERNAL_ERROR);
         // clear the fragment instance's related result queue
         _exec_env->result_queue_mgr()->cancel(context->fragment_instance_id);
         LOG(INFO) << "close scan context: context id [ " << context_id << " ]";
@@ -143,7 +144,8 @@ void ExternalScanContextMgr::gc_expired_context() {
         }
         for (auto expired_context : expired_contexts) {
             // must cancel the fragment instance, otherwise return thrift 
transport TTransportException
-            
_exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id);
+            
_exec_env->fragment_mgr()->cancel_instance(expired_context->fragment_instance_id,
+                                                       
PPlanFragmentCancelReason::INTERNAL_ERROR);
             
_exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id);
         }
     }
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 75a25a48ff..4579f80bd1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -42,11 +42,13 @@
 
 #include <atomic>
 
+#include "common/status.h"
 #include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <map>
 #include <memory>
+#include <mutex>
 #include <sstream>
 #include <utility>
 
@@ -61,6 +63,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
+#include "runtime/frontend_info.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/primitive_type.h"
@@ -75,6 +78,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
 #include "service/backend_options.h"
+#include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "util/hash_util.hpp"
 #include "util/mem_info.h"
@@ -503,7 +507,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, 
query_ctx->query_id().lo)
                   << " coord_addr " << query_ctx->coord_addr
-                  << " total fragment num on current host: " << 
params.fragment_num_on_host;
+                  << " total fragment num on current host: " << 
params.fragment_num_on_host
+                  << " fe process uuid: " << 
params.query_options.fe_process_uuid;
         query_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
@@ -842,72 +847,120 @@ void FragmentMgr::_set_scan_concurrency(const Param& 
params, QueryContext* query
 #endif
 }
 
-void FragmentMgr::cancel(const TUniqueId& fragment_id, const 
PPlanFragmentCancelReason& reason,
-                         const std::string& msg) {
-    bool find_the_fragment = false;
+void FragmentMgr::cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
+                               const std::string& msg) {
+    std::unique_lock<std::mutex> state_lock;
+    return cancel_query_unlocked(query_id, reason, state_lock, msg);
+}
 
-    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _fragment_map.find(fragment_id);
-        if (iter != _fragment_map.end()) {
-            fragment_executor = iter->second;
-        }
-    }
-    if (fragment_executor) {
-        find_the_fragment = true;
-        fragment_executor->cancel(reason, msg);
+// Cancel all instances/fragments of query, and set query_ctx of the query 
canceled at last.
+void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id,
+                                        const PPlanFragmentCancelReason& 
reason,
+                                        const std::unique_lock<std::mutex>& 
state_lock,
+                                        const std::string& msg) {
+    auto ctx = _query_ctx_map.find(query_id);
+
+    if (ctx == _query_ctx_map.end()) {
+        LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, 
failed to cancel it";
+        return;
     }
 
-    std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_fragment_ctx;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find(fragment_id);
-        if (iter != _pipeline_map.end()) {
-            pipeline_fragment_ctx = iter->second;
+    if (ctx->second->enable_pipeline_exec()) {
+        for (auto it : ctx->second->fragment_ids) {
+            // instance_id will not be removed from query_context.instance_ids 
currently
+            // and it will be removed from fragment_mgr::_pipeline_map only.
+            // so we add this check to avoid too many WARNING log.
+            if (_pipeline_map.contains(it)) {
+                cancel_instance_unlocked(it, reason, state_lock, msg);
+            }
+        }
+    } else {
+        for (auto it : ctx->second->fragment_ids) {
+            cancel_fragment_unlocked(it, reason, state_lock, msg);
         }
-    }
-    if (pipeline_fragment_ctx) {
-        find_the_fragment = true;
-        pipeline_fragment_ctx->cancel(reason, msg);
     }
 
-    if (!find_the_fragment) {
-        LOG(WARNING) << "Do not find the fragment instance id:" << fragment_id 
<< " to cancel";
-    }
+    LOG(INFO) << "Query " << print_id(query_id) << " is cancelled. Reason: " 
<< msg;
+    ctx->second->cancel(true, msg, Status::Cancelled(msg));
+    _query_ctx_map.erase(query_id);
 }
 
-void FragmentMgr::cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
-                               const std::string& msg) {
-    std::vector<TUniqueId> cancel_fragment_ids;
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto ctx = _query_ctx_map.find(query_id);
-        if (ctx != _query_ctx_map.end()) {
-            cancel_fragment_ids = ctx->second->fragment_ids;
+void FragmentMgr::cancel_fragment(const TUniqueId& fragment_id,
+                                  const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
+    std::unique_lock<std::mutex> state_lock(_lock);
+    return cancel_fragment_unlocked(fragment_id, reason, state_lock, msg);
+}
+
+void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& fragment_id,
+                                           const PPlanFragmentCancelReason& 
reason,
+                                           const std::unique_lock<std::mutex>& 
state_lock,
+                                           const std::string& msg) {
+    return cancel_unlocked_impl(fragment_id, reason, state_lock, false /*not 
pipeline query*/, msg);
+}
+
+void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
+                                  const PPlanFragmentCancelReason& reason, 
const std::string& msg) {
+    std::unique_lock<std::mutex> state_lock(_lock);
+    return cancel_instance_unlocked(instance_id, reason, state_lock, msg);
+}
+
+void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id,
+                                           const PPlanFragmentCancelReason& 
reason,
+                                           const std::unique_lock<std::mutex>& 
state_lock,
+                                           const std::string& msg) {
+    return cancel_unlocked_impl(instance_id, reason, state_lock, true 
/*pipeline query*/, msg);
+}
+
+void FragmentMgr::cancel_unlocked_impl(const TUniqueId& id, const 
PPlanFragmentCancelReason& reason,
+                                       const std::unique_lock<std::mutex>& 
/*state_lock*/,
+                                       bool is_pipeline, const std::string& 
msg) {
+    if (is_pipeline) {
+        const TUniqueId& instance_id = id;
+        auto itr = _pipeline_map.find(instance_id);
+
+        if (itr != _pipeline_map.end()) {
+            // calling PipelineFragmentContext::cancel
+            itr->second->cancel(reason, msg);
+        } else {
+            LOG(WARNING) << "Could not find the instance id:" << 
print_id(instance_id)
+                         << " to cancel";
+        }
+    } else {
+        const TUniqueId& fragment_id = id;
+        auto itr = _fragment_map.find(fragment_id);
+
+        if (itr != _fragment_map.end()) {
+            // calling PlanFragmentExecutor::cancel
+            itr->second->cancel(reason, msg);
+        } else {
+            LOG(WARNING) << "Could not find the fragment id:" << 
print_id(fragment_id)
+                         << " to cancel";
         }
-    }
-    for (auto it : cancel_fragment_ids) {
-        cancel(it, reason, msg);
     }
 }
 
 bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
     std::lock_guard<std::mutex> lock(_lock);
     auto ctx = _query_ctx_map.find(query_id);
-    if (ctx != _query_ctx_map.end()) {
-        for (auto it : ctx->second->fragment_ids) {
-            auto fragment_executor_iter = _fragment_map.find(it);
-            if (fragment_executor_iter != _fragment_map.end() && 
fragment_executor_iter->second) {
-                return fragment_executor_iter->second->is_canceled();
-            }
 
-            auto pipeline_ctx_iter = _pipeline_map.find(it);
-            if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
-                return pipeline_ctx_iter->second->is_canceled();
+    if (ctx != _query_ctx_map.end()) {
+        const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
+        for (auto itr : ctx->second->fragment_ids) {
+            if (is_pipeline_version) {
+                auto pipeline_ctx_iter = _pipeline_map.find(itr);
+                if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
+                    return pipeline_ctx_iter->second->is_canceled();
+                }
+            } else {
+                auto fragment_executor_iter = _fragment_map.find(itr);
+                if (fragment_executor_iter != _fragment_map.end() &&
+                    fragment_executor_iter->second) {
+                    return fragment_executor_iter->second->is_canceled();
+                }
             }
         }
     }
+
     return true;
 }
 
@@ -915,7 +968,7 @@ void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
         std::vector<TUniqueId> to_cancel;
-        std::vector<TUniqueId> to_cancel_queries;
+        std::vector<TUniqueId> queries_to_cancel;
         vectorized::VecDateTimeValue now = 
vectorized::VecDateTimeValue::local_time();
         {
             std::lock_guard<std::mutex> lock(_lock);
@@ -931,13 +984,40 @@ void FragmentMgr::cancel_worker() {
                     ++it;
                 }
             }
+
+            const auto& running_fes = 
ExecEnv::GetInstance()->get_running_frontends();
+            for (const auto& q : _query_ctx_map) {
+                auto itr = running_fes.find(q.second->coord_addr);
+                if (itr != running_fes.end()) {
+                    // We use conservative strategy.
+                    // 1. If same process uuid, do not cancel
+                    // 2. If fe has zero process uuid, do not cancel
+                    // 3. If query's process uuid is zero, do not cancel
+                    if (q.second->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
+                        itr->second.info.process_uuid == 0 ||
+                        q.second->get_fe_process_uuid() == 0) {
+                        continue;
+                    }
+                }
+
+                // Coorninator of this query has already dead.
+                queries_to_cancel.push_back(q.first);
+            }
         }
+
+        // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
+        // designed to count canceled fragment of non-pipeline query.
         timeout_canceled_fragment_count->increment(to_cancel.size());
         for (auto& id : to_cancel) {
-            cancel(id, PPlanFragmentCancelReason::TIMEOUT);
+            cancel_fragment(id, PPlanFragmentCancelReason::TIMEOUT);
             LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout 
fragment "
                       << print_id(id);
         }
+
+        for (const auto& qid : queries_to_cancel) {
+            cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
+                         std::string("Coordinator dead."));
+        }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
     LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
 }
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index cdc01627a0..8548d19d78 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -93,15 +93,28 @@ public:
 
     Status start_query_execution(const PExecPlanFragmentStartRequest* request);
 
-    void cancel(const TUniqueId& fragment_id) {
-        cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
-    }
-
-    void cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& 
reason,
-                const std::string& msg = "");
-
+    // This method can only be used to cancel a fragment of non-pipeline query.
+    void cancel_fragment(const TUniqueId& fragment_id, const 
PPlanFragmentCancelReason& reason,
+                         const std::string& msg = "");
+    void cancel_fragment_unlocked(const TUniqueId& instance_id,
+                                  const PPlanFragmentCancelReason& reason,
+                                  const std::unique_lock<std::mutex>& 
state_lock,
+                                  const std::string& msg = "");
+
+    // Pipeline version, cancel a fragment instance.
+    void cancel_instance(const TUniqueId& instance_id, const 
PPlanFragmentCancelReason& reason,
+                         const std::string& msg = "");
+    void cancel_instance_unlocked(const TUniqueId& instance_id,
+                                  const PPlanFragmentCancelReason& reason,
+                                  const std::unique_lock<std::mutex>& 
state_lock,
+                                  const std::string& msg = "");
+
+    // Can be used in both version.
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                       const std::string& msg = "");
+    void cancel_query_unlocked(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
+                               const std::unique_lock<std::mutex>& state_lock,
+                               const std::string& msg = "");
 
     bool query_is_canceled(const TUniqueId& query_id);
 
@@ -132,6 +145,10 @@ public:
     ThreadPool* get_thread_pool() { return _thread_pool.get(); }
 
 private:
+    void cancel_unlocked_impl(const TUniqueId& id, const 
PPlanFragmentCancelReason& reason,
+                              const std::unique_lock<std::mutex>& state_lock, 
bool is_pipeline,
+                              const std::string& msg = "");
+
     void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
                       const FinishCallback& cb);
 
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/frontend_info.h
similarity index 83%
copy from be/src/runtime/exec_env.cpp
copy to be/src/runtime/frontend_info.h
index 8f63d6fe6a..c16d63096f 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/frontend_info.h
@@ -15,17 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/exec_env.h"
-
 #include <gen_cpp/HeartbeatService_types.h>
 
-namespace doris {
+#include <ctime>
 
-ExecEnv::ExecEnv() : _is_init(false) {}
+namespace doris {
 
-ExecEnv::~ExecEnv() {}
+struct FrontendInfo {
+    TFrontendInfo info;
+    std::time_t first_receiving_time_ms;
+    std::time_t last_reveiving_time_ms;
+};
 
-const std::string& ExecEnv::token() const {
-    return _master_info->token;
-}
 } // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index d53a3731d8..92974c73a2 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -47,7 +47,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
 }
 
 void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) {
-    ExecEnv::GetInstance()->fragment_mgr()->cancel(
+    ExecEnv::GetInstance()->fragment_mgr()->cancel_instance(
             _fragment_instance_id, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 105ac80b81..7882b21c8d 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -217,6 +217,8 @@ public:
         return _query_options.be_exec_version;
     }
 
+    [[nodiscard]] int64_t get_fe_process_uuid() const { return 
_query_options.fe_process_uuid; }
+
     RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); 
}
 
     TUniqueId query_id() const { return _query_id; }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index c04091519a..76e22fe084 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -40,6 +40,7 @@
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/integral_types.h"
+#include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "util/telemetry/telemetry.h"
 
@@ -173,7 +174,9 @@ public:
         _is_cancelled.store(v);
         // Create a error status, so that we could print error stack, and
         // we could know which path call cancel.
-        LOG(INFO) << "task is cancelled, st = " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+        LOG(WARNING) << "Task is cancelled, instance: "
+                     << PrintInstanceStandardInfo(_query_id, _fragment_id, 
_fragment_instance_id)
+                     << " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
     }
 
     void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 21c5c7b4f6..90419b0b5f 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -118,7 +118,8 @@ Status BackendService::start_plan_fragment_execution(const 
TExecPlanFragmentPara
 void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& 
return_val,
                                           const TCancelPlanFragmentParams& 
params) {
     LOG(INFO) << "cancel_plan_fragment(): instance_id=" << 
params.fragment_instance_id;
-    _exec_env->fragment_mgr()->cancel(params.fragment_instance_id);
+    _exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id,
+                                               
PPlanFragmentCancelReason::INTERNAL_ERROR);
 }
 
 void BackendService::transmit_data(TTransmitDataResult& return_val,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index efd2a0af46..5effa721e1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -559,10 +559,11 @@ void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
         if (request->has_cancel_reason()) {
             LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
                       << ", reason: " << 
PPlanFragmentCancelReason_Name(request->cancel_reason());
-            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+            _exec_env->fragment_mgr()->cancel_instance(tid, 
request->cancel_reason());
         } else {
             LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
-            _exec_env->fragment_mgr()->cancel(tid);
+            _exec_env->fragment_mgr()->cancel_instance(tid,
+                                                       
PPlanFragmentCancelReason::INTERNAL_ERROR);
         }
         // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
         st.to_protobuf(result->mutable_status());
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 9ea88acc25..2d44c281a5 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -17,6 +17,7 @@
 
 #include "util/debug_util.h"
 
+#include <gen_cpp/HeartbeatService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <stdint.h>
 
@@ -26,6 +27,7 @@
 #include <utility>
 
 #include "common/version_internal.h"
+#include "util/uid_util.h"
 
 namespace doris {
 
@@ -101,4 +103,35 @@ std::string hexdump(const char* buf, int len) {
     return ss.str();
 }
 
+std::string PrintThriftNetworkAddress(const TNetworkAddress& add) {
+    std::stringstream ss;
+    add.printTo(ss);
+    return ss.str();
+}
+
+std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos) {
+    std::stringstream ss;
+    const size_t count = fe_infos.size();
+
+    for (int i = 0; i < count; ++i) {
+        fe_infos[i].printTo(ss);
+        ss << ' ';
+    }
+
+    return ss.str();
+}
+
+std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
+    std::stringstream ss;
+    fe_info.printTo(ss);
+
+    return ss.str();
+}
+
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, 
const TUniqueId& iid) {
+    std::stringstream ss;
+    ss << print_id(iid) << '|' << fid << '|' << print_id(qid);
+    return ss.str();
+}
+
 } // namespace doris
diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h
index fbc0c221f4..e6b6491b8a 100644
--- a/be/src/util/debug_util.h
+++ b/be/src/util/debug_util.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/HeartbeatService_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
@@ -30,6 +31,14 @@ std::string print_tstmt_type(const TStmtType::type& type);
 std::string print_query_state(const QueryState::type& type);
 std::string PrintTUnit(const TUnit::type& type);
 std::string PrintTMetricKind(const TMetricKind::type& type);
+std::string PrintThriftNetworkAddress(const TNetworkAddress&);
+std::string PrintFrontendInfo(const TFrontendInfo& fe_info);
+std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos);
+
+// A desirable scenario would be to call this function WHENEVER whenever we 
need to print instance information.
+// By using a fixed format, we would be able to identify all the paths in 
which this instance is executed.
+// InstanceId|FragmentIdx|QueryId
+std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, 
const TUniqueId& iid);
 
 // Returns a string "<product version number> (build <build hash>)"
 // If compact == false, this string is appended: "\nBuilt on <build time>"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ab23ca5064..af86bb32ef 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -241,6 +241,7 @@ import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.task.PriorityMasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TFrontendInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
@@ -477,6 +478,19 @@ public class Env {
 
     private HiveTransactionMgr hiveTransactionMgr;
 
+    public List<TFrontendInfo> getFrontendInfos() {
+        List<TFrontendInfo> res = new ArrayList<>();
+
+        for (Frontend fe : frontends.values()) {
+            TFrontendInfo feInfo = new TFrontendInfo();
+            feInfo.setCoordinatorAddress(new TNetworkAddress(fe.getHost(), 
fe.getRpcPort()));
+            feInfo.setProcessUuid(fe.getProcessUUID());
+            res.add(feInfo);
+        }
+
+        return res;
+    }
+
     public List<Frontend> getFrontends(FrontendNodeType nodeType) {
         if (nodeType == null) {
             // get all
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7a1278266c..1c97dadf3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -72,6 +72,7 @@ import org.apache.doris.proto.Types.PUniqueId;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -399,6 +400,7 @@ public class Coordinator {
         this.queryOptions.setQueryTimeout(context.getExecTimeout());
         this.queryOptions.setExecutionTimeout(context.getExecTimeout());
         
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
+        
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
     }
 
     public ConnectContext getConnectContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
index f7ba622b7e..7b9721464e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java
@@ -30,14 +30,14 @@ public class ExecuteEnv {
     private static volatile ExecuteEnv INSTANCE;
     private MultiLoadMgr multiLoadMgr;
     private ConnectScheduler scheduler;
-    private long startupTime;
+    private long processUUID;
 
     private List<FeDiskInfo> diskInfos;
 
     private ExecuteEnv() {
         multiLoadMgr = new MultiLoadMgr();
         scheduler = new ConnectScheduler(Config.qe_max_connection);
-        startupTime = System.currentTimeMillis();
+        processUUID = System.currentTimeMillis();
         diskInfos = new ArrayList<FeDiskInfo>() {{
                 add(new FeDiskInfo("meta", Config.meta_dir, 
DiskUtils.df(Config.meta_dir)));
                 add(new FeDiskInfo("log", Config.sys_log_dir, 
DiskUtils.df(Config.sys_log_dir)));
@@ -65,8 +65,8 @@ public class ExecuteEnv {
         return multiLoadMgr;
     }
 
-    public long getStartupTime() {
-        return startupTime;
+    public long getProcessUUID() {
+        return processUUID;
     }
 
     public List<FeDiskInfo> getDiskInfos() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index db7514a3be..dfb88c7610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1933,6 +1933,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
             txnState.addTableIndexes(table);
             plan.setTableName(table.getName());
+            
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
             return plan;
         } finally {
             table.readUnlock();
@@ -2033,7 +2034,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 result.setQueryPort(Config.query_port);
                 result.setRpcPort(Config.rpc_port);
                 result.setVersion(Version.DORIS_BUILD_VERSION + "-" + 
Version.DORIS_BUILD_SHORT_HASH);
-                result.setLastStartupTime(exeEnv.getStartupTime());
+                result.setLastStartupTime(exeEnv.getProcessUUID());
                 if (exeEnv.getDiskInfos() != null) {
                     
result.setDiskInfos(FeDiskInfo.toThrifts(exeEnv.getDiskInfos()));
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index 35964f2b2a..4f16fa1629 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -60,6 +60,8 @@ public class Frontend implements Writable {
 
     private boolean isAlive = false;
 
+    private long processUUID = 0;
+
     public Frontend() {
     }
 
@@ -122,6 +124,10 @@ public class Frontend implements Writable {
         return lastStartupTime;
     }
 
+    public long getProcessUUID() {
+        return processUUID;
+    }
+
     public long getLastUpdateTime() {
         return lastUpdateTime;
     }
@@ -150,10 +156,16 @@ public class Frontend implements Writable {
             replayedJournalId = hbResponse.getReplayedJournalId();
             lastUpdateTime = hbResponse.getHbTime();
             heartbeatErrMsg = "";
-            lastStartupTime = hbResponse.getFeStartTime();
+            lastStartupTime = hbResponse.getProcessUUID();
             diskInfos = hbResponse.getDiskInfos();
             isChanged = true;
+            processUUID = lastStartupTime;
         } else {
+            // A non-master node disconnected.
+            // Set startUUID to zero, and be's heartbeat mgr will ignore this 
hb,
+            // so that its cancel worker will not cancel queries from this fe 
immediately
+            // until it receives a valid start UUID.
+            processUUID = 0;
             if (isAlive) {
                 isAlive = false;
                 isChanged = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
index 8ad4f36e6d..54b9344ac2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java
@@ -19,6 +19,7 @@ package org.apache.doris.system;
 
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FeDiskInfo;
 
 import com.google.gson.annotations.SerializedName;
@@ -41,7 +42,7 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
     @SerializedName(value = "replayedJournalId")
     private long replayedJournalId;
     private String version;
-    private long feStartTime;
+    private long processUUID;
     private List<FeDiskInfo> diskInfos;
 
     public FrontendHbResponse() {
@@ -50,7 +51,7 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
 
     public FrontendHbResponse(String name, int queryPort, int rpcPort,
             long replayedJournalId, long hbTime, String version,
-            long feStartTime, List<FeDiskInfo> diskInfos) {
+            long processUUID, List<FeDiskInfo> diskInfos) {
         super(HeartbeatResponse.Type.FRONTEND);
         this.status = HbStatus.OK;
         this.name = name;
@@ -59,7 +60,7 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
         this.replayedJournalId = replayedJournalId;
         this.hbTime = hbTime;
         this.version = version;
-        this.feStartTime = feStartTime;
+        this.processUUID = processUUID;
         this.diskInfos = diskInfos;
     }
 
@@ -68,6 +69,7 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
         this.status = HbStatus.BAD;
         this.name = name;
         this.msg = errMsg;
+        this.processUUID = ExecuteEnv.getInstance().getProcessUUID();
     }
 
     public String getName() {
@@ -90,8 +92,8 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
         return version;
     }
 
-    public long getFeStartTime() {
-        return feStartTime;
+    public long getProcessUUID() {
+        return processUUID;
     }
 
     public List<FeDiskInfo> getDiskInfos() {
@@ -116,7 +118,7 @@ public class FrontendHbResponse extends HeartbeatResponse 
implements Writable {
         sb.append(", queryPort: ").append(queryPort);
         sb.append(", rpcPort: ").append(rpcPort);
         sb.append(", replayedJournalId: ").append(replayedJournalId);
-        sb.append(", festartTime: ").append(feStartTime);
+        sb.append(", festartTime: ").append(processUUID);
         return sb.toString();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 6cfcff34e0..87c7073142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
 import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TBrokerPingBrokerRequest;
 import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TFrontendInfo;
 import org.apache.doris.thrift.TFrontendPingFrontendRequest;
 import org.apache.doris.thrift.TFrontendPingFrontendResult;
 import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
@@ -101,11 +102,12 @@ public class HeartbeatMgr extends MasterDaemon {
      */
     @Override
     protected void runAfterCatalogReady() {
+        // Get feInfos of previous iteration.
+        List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos();
         List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();
-
         // send backend heartbeat
         for (Backend backend : nodeMgr.getIdToBackend().values()) {
-            BackendHeartbeatHandler handler = new 
BackendHeartbeatHandler(backend);
+            BackendHeartbeatHandler handler = new 
BackendHeartbeatHandler(backend, feInfos);
             hbResponses.add(executor.submit(handler));
         }
 
@@ -204,9 +206,11 @@ public class HeartbeatMgr extends MasterDaemon {
     // backend heartbeat
     private class BackendHeartbeatHandler implements 
Callable<HeartbeatResponse> {
         private Backend backend;
+        private List<TFrontendInfo> feInfos;
 
-        public BackendHeartbeatHandler(Backend backend) {
+        public BackendHeartbeatHandler(Backend backend, List<TFrontendInfo> 
feInfos) {
             this.backend = backend;
+            this.feInfos = feInfos;
         }
 
         @Override
@@ -222,6 +226,7 @@ public class HeartbeatMgr extends MasterDaemon {
                 long flags = heartbeatFlags.getHeartbeatFlags();
                 copiedMasterInfo.setHeartbeatFlags(flags);
                 copiedMasterInfo.setBackendId(backendId);
+                copiedMasterInfo.setFrontendInfos(feInfos);
                 THeartbeatResult result;
                 if (!FeConstants.runningUnitTest) {
                     client = 
ClientPool.backendHeartbeatPool.borrowObject(beAddr);
@@ -301,7 +306,7 @@ public class HeartbeatMgr extends MasterDaemon {
                     return new FrontendHbResponse(fe.getNodeName(), 
Config.query_port, Config.rpc_port,
                             Env.getCurrentEnv().getMaxJournalId(), 
System.currentTimeMillis(),
                             Version.DORIS_BUILD_VERSION + "-" + 
Version.DORIS_BUILD_SHORT_HASH,
-                            ExecuteEnv.getInstance().getStartupTime(), 
ExecuteEnv.getInstance().getDiskInfos());
+                            ExecuteEnv.getInstance().getProcessUUID(), 
ExecuteEnv.getInstance().getDiskInfos());
                 } else {
                     return new FrontendHbResponse(fe.getNodeName(), "not 
ready");
                 }
diff --git a/gensrc/thrift/HeartbeatService.thrift 
b/gensrc/thrift/HeartbeatService.thrift
index 38ea17e907..8361fcbf61 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -33,6 +33,7 @@ struct TMasterInfo {
     6: optional Types.TPort http_port
     7: optional i64 heartbeat_flags
     8: optional i64 backend_id
+    9: optional list<TFrontendInfo> frontend_infos
 }
 
 struct TBackendInfo {
@@ -53,3 +54,8 @@ struct THeartbeatResult {
 service HeartbeatService {
     THeartbeatResult heartbeat(1:TMasterInfo master_info);
 }
+
+struct TFrontendInfo {
+    1: optional Types.TNetworkAddress coordinator_address
+    2: optional i64 process_uuid
+}
\ No newline at end of file
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index a87238dfff..2f20057840 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -237,6 +237,9 @@ struct TQueryOptions {
   80: optional bool enable_memtable_on_sink_node = false;
 
   81: optional bool enable_delete_sub_predicate_v2 = false;
+
+  // A tag used to distinguish fe start epoch.
+  82: optional i64 fe_process_uuid = 0;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to