This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c083336bbe [Improvement](pipeline) Cancel outdated query if original fe restarts (#23582) c083336bbe is described below commit c083336bbee1ab2a07a544e5f25a226cd5c28302 Author: hzq <seuhezhiqi...@163.com> AuthorDate: Thu Aug 31 17:58:52 2023 +0800 [Improvement](pipeline) Cancel outdated query if original fe restarts (#23582) If any FE restarts, queries that is emitted from this FE will be cancelled. Implementation of #23704 --- be/src/agent/heartbeat_server.cpp | 8 +- be/src/common/config.cpp | 2 + be/src/common/config.h | 4 + be/src/pipeline/pipeline_fragment_context.cpp | 9 +- be/src/pipeline/pipeline_fragment_context.h | 2 + be/src/pipeline/task_scheduler.cpp | 8 +- be/src/runtime/exec_env.cpp | 105 ++++++++++++ be/src/runtime/exec_env.h | 10 ++ be/src/runtime/external_scan_context_mgr.cpp | 6 +- be/src/runtime/fragment_mgr.cpp | 180 +++++++++++++++------ be/src/runtime/fragment_mgr.h | 31 +++- be/src/runtime/{exec_env.cpp => frontend_info.h} | 15 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 2 +- be/src/runtime/query_context.h | 2 + be/src/runtime/runtime_state.h | 5 +- be/src/service/backend_service.cpp | 3 +- be/src/service/internal_service.cpp | 5 +- be/src/util/debug_util.cpp | 33 ++++ be/src/util/debug_util.h | 9 ++ .../main/java/org/apache/doris/catalog/Env.java | 14 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 2 + .../java/org/apache/doris/service/ExecuteEnv.java | 8 +- .../apache/doris/service/FrontendServiceImpl.java | 3 +- .../java/org/apache/doris/system/Frontend.java | 14 +- .../apache/doris/system/FrontendHbResponse.java | 14 +- .../java/org/apache/doris/system/HeartbeatMgr.java | 13 +- gensrc/thrift/HeartbeatService.thrift | 6 + gensrc/thrift/PaloInternalService.thrift | 3 + 28 files changed, 423 insertions(+), 93 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 855be182ff..71e8b0adcb 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -62,6 +62,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, << "host:" << master_info.network_address.hostname << ", port:" << master_info.network_address.port << ", cluster id:" << master_info.cluster_id + << ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos) << ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch; MonotonicStopWatch watch; @@ -97,7 +98,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _master_info->cluster_id = master_info.cluster_id; LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname << ". port: " << master_info.network_address.port - << ". cluster id: " << master_info.cluster_id; + << ". cluster id: " << master_info.cluster_id + << ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos); } else { if (_master_info->cluster_id != master_info.cluster_id) { return Status::InternalError("invalid cluster id. ignore. cluster_id={}", @@ -210,6 +212,10 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _master_info->__set_backend_id(master_info.backend_id); } + if (master_info.__isset.frontend_infos) { + ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos); + } + if (need_report) { LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; _olap_engine->notify_listeners(); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3ada8cb82f..873db8fd67 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1078,6 +1078,8 @@ DEFINE_mString(user_files_secure_path, "${DORIS_HOME}"); DEFINE_Int32(partition_topn_partition_threshold, "1024"); +DEFINE_Int32(fe_expire_duration_seconds, "60"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index beca3957ef..9c452c8cfc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1137,6 +1137,10 @@ DECLARE_mString(user_files_secure_path); // and if this threshold is exceeded, the remaining data will be pass through to other node directly. DECLARE_Int32(partition_topn_partition_threshold); +// If fe's frontend info has not been updated for more than fe_expire_duration_seconds, it will be regarded +// as an abnormal fe, this will cause be to cancel this fe's related query. +DECLARE_Int32(fe_expire_duration_seconds); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 10a6518ff1..97689ed001 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -151,7 +151,10 @@ PipelineFragmentContext::~PipelineFragmentContext() { void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg; + LOG(WARNING) << "PipelineFragmentContext " + << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) + << " is canceled, cancel message: " << msg; + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); @@ -194,8 +197,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } LOG_INFO("PipelineFragmentContext::prepare") - .tag("query_id", _query_id) - .tag("instance_id", local_params.fragment_instance_id) + .tag("query_id", print_id(_query_id)) + .tag("instance_id", print_id(local_params.fragment_instance_id)) .tag("backend_num", local_params.backend_num) .tag("pthread_id", (uintptr_t)pthread_self()); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index de3451d11a..4b35c206e5 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -101,6 +101,8 @@ public: TUniqueId get_query_id() const { return _query_id; } + [[nodiscard]] int get_fragment_id() const { return _fragment_id; } + void close_a_pipeline(); std::string to_http_path(const std::string& file_name); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 11d20ac7a9..8b2f8e3f0c 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -36,6 +36,7 @@ #include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" #include "runtime/query_context.h" +#include "util/debug_util.h" #include "util/sse_util.hpp" #include "util/thread.h" #include "util/threadpool.h" @@ -269,7 +270,12 @@ void TaskScheduler::_do_work(size_t index) { task->set_previous_core_id(index); if (!status.ok()) { task->set_eos_time(); - LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}", status.to_string()); + LOG(WARNING) << fmt::format( + "Pipeline task failed. query_id: {} reason: {}", + PrintInstanceStandardInfo(task->query_context()->query_id(), + task->fragment_context()->get_fragment_id(), + task->fragment_context()->get_fragment_instance_id()), + status.to_string()); // Print detail informations below when you debugging here. // // LOG(WARNING)<< "task:\n"<<task->debug_string(); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 8f63d6fe6a..333ba4f97a 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -19,6 +19,15 @@ #include <gen_cpp/HeartbeatService_types.h> +#include <mutex> +#include <utility> + +#include "common/config.h" +#include "runtime/frontend_info.h" +#include "time.h" +#include "util/debug_util.h" +#include "util/time.h" + namespace doris { ExecEnv::ExecEnv() : _is_init(false) {} @@ -28,4 +37,100 @@ ExecEnv::~ExecEnv() {} const std::string& ExecEnv::token() const { return _master_info->token; } + +std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() { + std::lock_guard<std::mutex> lg(_frontends_lock); + return _frontends; +} + +void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) { + std::lock_guard<std::mutex> lg(_frontends_lock); + + std::set<TNetworkAddress> dropped_fes; + + for (const auto& cur_fe : _frontends) { + dropped_fes.insert(cur_fe.first); + } + + for (const auto& coming_fe_info : new_fe_infos) { + auto itr = _frontends.find(coming_fe_info.coordinator_address); + + if (itr == _frontends.end()) { + LOG(INFO) << "A completely new frontend, " << PrintFrontendInfo(coming_fe_info); + + _frontends.insert(std::pair<TNetworkAddress, FrontendInfo>( + coming_fe_info.coordinator_address, + FrontendInfo {coming_fe_info, GetCurrentTimeMicros() / 1000, /*first time*/ + GetCurrentTimeMicros() / 1000 /*last time*/})); + + continue; + } + + dropped_fes.erase(coming_fe_info.coordinator_address); + + if (coming_fe_info.process_uuid == 0) { + LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info) + << " is in an unknown state."; + } + + if (coming_fe_info.process_uuid == itr->second.info.process_uuid) { + itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000; + continue; + } + + // If we get here, means this frontend has already restarted. + itr->second.info.process_uuid = coming_fe_info.process_uuid; + itr->second.first_receiving_time_ms = GetCurrentTimeMicros() / 1000; + itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000; + LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info); + } + + for (const auto& dropped_fe : dropped_fes) { + LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe) + << " has already been dropped, remove it"; + _frontends.erase(dropped_fe); + } +} + +std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() { + std::lock_guard<std::mutex> lg(_frontends_lock); + std::map<TNetworkAddress, FrontendInfo> res; + const int expired_duration = config::fe_expire_duration_seconds * 1000; + const auto now = GetCurrentTimeMicros() / 1000; + + for (const auto& pair : _frontends) { + if (pair.second.info.process_uuid != 0) { + if (now - pair.second.last_reveiving_time_ms < expired_duration) { + // If fe info has just been update in last expired_duration, regard it as running. + res[pair.first] = pair.second; + } else { + // Fe info has not been udpate for more than expired_duration, regard it as an abnormal. + // Abnormal means this fe can not connect to master, and it is not dropped from cluster. + // or fe do not have master yet. + LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info) + << " has not update its hb " + << "for more than " << config::fe_expire_duration_seconds + << " secs, regard it as abnormal."; + } + + continue; + } + + if (pair.second.last_reveiving_time_ms - pair.second.first_receiving_time_ms > + expired_duration) { + // A zero process-uuid that sustains more than 60 seconds(default). + // We will regard this fe as a abnormal frontend. + LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info) + << " has not update its hb " + << "for more than " << config::fe_expire_duration_seconds + << " secs, regard it as abnormal."; + continue; + } else { + res[pair.first] = pair.second; + } + } + + return res; +} + } // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 18dd926ba6..997be48974 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -17,11 +17,13 @@ #pragma once +#include <gen_cpp/HeartbeatService_types.h> #include <stddef.h> #include <algorithm> #include <map> #include <memory> +#include <mutex> #include <shared_mutex> #include <string> #include <unordered_map> @@ -34,6 +36,7 @@ #include "vec/common/hash_table/phmap_fwd_decl.h" namespace doris { +struct FrontendInfo; namespace vectorized { class VDataStreamMgr; class ScannerScheduler; @@ -199,6 +202,10 @@ public: this->_stream_load_executor = stream_load_executor; } + void update_frontends(const std::vector<TFrontendInfo>& new_infos); + std::map<TNetworkAddress, FrontendInfo> get_frontends(); + std::map<TNetworkAddress, FrontendInfo> get_running_frontends(); + private: Status _init(const std::vector<StorePath>& store_paths); void _destroy(); @@ -277,6 +284,9 @@ private: std::unique_ptr<vectorized::ZoneList> _global_zone_cache; std::shared_mutex _zone_cache_rw_lock; + + std::mutex _frontends_lock; + std::map<TNetworkAddress, FrontendInfo> _frontends; }; template <> diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 9bb095d9a6..2a3dc92521 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -103,7 +103,8 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) } if (context != nullptr) { // first cancel the fragment instance, just ignore return status - _exec_env->fragment_mgr()->cancel(context->fragment_instance_id); + _exec_env->fragment_mgr()->cancel_instance(context->fragment_instance_id, + PPlanFragmentCancelReason::INTERNAL_ERROR); // clear the fragment instance's related result queue _exec_env->result_queue_mgr()->cancel(context->fragment_instance_id); LOG(INFO) << "close scan context: context id [ " << context_id << " ]"; @@ -143,7 +144,8 @@ void ExternalScanContextMgr::gc_expired_context() { } for (auto expired_context : expired_contexts) { // must cancel the fragment instance, otherwise return thrift transport TTransportException - _exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id); + _exec_env->fragment_mgr()->cancel_instance(expired_context->fragment_instance_id, + PPlanFragmentCancelReason::INTERNAL_ERROR); _exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id); } } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 75a25a48ff..4579f80bd1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -42,11 +42,13 @@ #include <atomic> +#include "common/status.h" #include "pipeline/pipeline_x/pipeline_x_fragment_context.h" // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <map> #include <memory> +#include <mutex> #include <sstream> #include <utility> @@ -61,6 +63,7 @@ #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "runtime/frontend_info.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/plan_fragment_executor.h" #include "runtime/primitive_type.h" @@ -75,6 +78,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "service/backend_options.h" +#include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/hash_util.hpp" #include "util/mem_info.h" @@ -503,7 +507,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo) << " coord_addr " << query_ctx->coord_addr - << " total fragment num on current host: " << params.fragment_num_on_host; + << " total fragment num on current host: " << params.fragment_num_on_host + << " fe process uuid: " << params.query_options.fe_process_uuid; query_ctx->query_globals = params.query_globals; if (params.__isset.resource_info) { @@ -842,72 +847,120 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query #endif } -void FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, - const std::string& msg) { - bool find_the_fragment = false; +void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, + const std::string& msg) { + std::unique_lock<std::mutex> state_lock; + return cancel_query_unlocked(query_id, reason, state_lock, msg); +} - std::shared_ptr<PlanFragmentExecutor> fragment_executor; - { - std::lock_guard<std::mutex> lock(_lock); - auto iter = _fragment_map.find(fragment_id); - if (iter != _fragment_map.end()) { - fragment_executor = iter->second; - } - } - if (fragment_executor) { - find_the_fragment = true; - fragment_executor->cancel(reason, msg); +// Cancel all instances/fragments of query, and set query_ctx of the query canceled at last. +void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg) { + auto ctx = _query_ctx_map.find(query_id); + + if (ctx == _query_ctx_map.end()) { + LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, failed to cancel it"; + return; } - std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_fragment_ctx; - { - std::lock_guard<std::mutex> lock(_lock); - auto iter = _pipeline_map.find(fragment_id); - if (iter != _pipeline_map.end()) { - pipeline_fragment_ctx = iter->second; + if (ctx->second->enable_pipeline_exec()) { + for (auto it : ctx->second->fragment_ids) { + // instance_id will not be removed from query_context.instance_ids currently + // and it will be removed from fragment_mgr::_pipeline_map only. + // so we add this check to avoid too many WARNING log. + if (_pipeline_map.contains(it)) { + cancel_instance_unlocked(it, reason, state_lock, msg); + } + } + } else { + for (auto it : ctx->second->fragment_ids) { + cancel_fragment_unlocked(it, reason, state_lock, msg); } - } - if (pipeline_fragment_ctx) { - find_the_fragment = true; - pipeline_fragment_ctx->cancel(reason, msg); } - if (!find_the_fragment) { - LOG(WARNING) << "Do not find the fragment instance id:" << fragment_id << " to cancel"; - } + LOG(INFO) << "Query " << print_id(query_id) << " is cancelled. Reason: " << msg; + ctx->second->cancel(true, msg, Status::Cancelled(msg)); + _query_ctx_map.erase(query_id); } -void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, - const std::string& msg) { - std::vector<TUniqueId> cancel_fragment_ids; - { - std::lock_guard<std::mutex> lock(_lock); - auto ctx = _query_ctx_map.find(query_id); - if (ctx != _query_ctx_map.end()) { - cancel_fragment_ids = ctx->second->fragment_ids; +void FragmentMgr::cancel_fragment(const TUniqueId& fragment_id, + const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::unique_lock<std::mutex> state_lock(_lock); + return cancel_fragment_unlocked(fragment_id, reason, state_lock, msg); +} + +void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& fragment_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg) { + return cancel_unlocked_impl(fragment_id, reason, state_lock, false /*not pipeline query*/, msg); +} + +void FragmentMgr::cancel_instance(const TUniqueId& instance_id, + const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::unique_lock<std::mutex> state_lock(_lock); + return cancel_instance_unlocked(instance_id, reason, state_lock, msg); +} + +void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg) { + return cancel_unlocked_impl(instance_id, reason, state_lock, true /*pipeline query*/, msg); +} + +void FragmentMgr::cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& /*state_lock*/, + bool is_pipeline, const std::string& msg) { + if (is_pipeline) { + const TUniqueId& instance_id = id; + auto itr = _pipeline_map.find(instance_id); + + if (itr != _pipeline_map.end()) { + // calling PipelineFragmentContext::cancel + itr->second->cancel(reason, msg); + } else { + LOG(WARNING) << "Could not find the instance id:" << print_id(instance_id) + << " to cancel"; + } + } else { + const TUniqueId& fragment_id = id; + auto itr = _fragment_map.find(fragment_id); + + if (itr != _fragment_map.end()) { + // calling PlanFragmentExecutor::cancel + itr->second->cancel(reason, msg); + } else { + LOG(WARNING) << "Could not find the fragment id:" << print_id(fragment_id) + << " to cancel"; } - } - for (auto it : cancel_fragment_ids) { - cancel(it, reason, msg); } } bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) { std::lock_guard<std::mutex> lock(_lock); auto ctx = _query_ctx_map.find(query_id); - if (ctx != _query_ctx_map.end()) { - for (auto it : ctx->second->fragment_ids) { - auto fragment_executor_iter = _fragment_map.find(it); - if (fragment_executor_iter != _fragment_map.end() && fragment_executor_iter->second) { - return fragment_executor_iter->second->is_canceled(); - } - auto pipeline_ctx_iter = _pipeline_map.find(it); - if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) { - return pipeline_ctx_iter->second->is_canceled(); + if (ctx != _query_ctx_map.end()) { + const bool is_pipeline_version = ctx->second->enable_pipeline_exec(); + for (auto itr : ctx->second->fragment_ids) { + if (is_pipeline_version) { + auto pipeline_ctx_iter = _pipeline_map.find(itr); + if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) { + return pipeline_ctx_iter->second->is_canceled(); + } + } else { + auto fragment_executor_iter = _fragment_map.find(itr); + if (fragment_executor_iter != _fragment_map.end() && + fragment_executor_iter->second) { + return fragment_executor_iter->second->is_canceled(); + } } } } + return true; } @@ -915,7 +968,7 @@ void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; do { std::vector<TUniqueId> to_cancel; - std::vector<TUniqueId> to_cancel_queries; + std::vector<TUniqueId> queries_to_cancel; vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time(); { std::lock_guard<std::mutex> lock(_lock); @@ -931,13 +984,40 @@ void FragmentMgr::cancel_worker() { ++it; } } + + const auto& running_fes = ExecEnv::GetInstance()->get_running_frontends(); + for (const auto& q : _query_ctx_map) { + auto itr = running_fes.find(q.second->coord_addr); + if (itr != running_fes.end()) { + // We use conservative strategy. + // 1. If same process uuid, do not cancel + // 2. If fe has zero process uuid, do not cancel + // 3. If query's process uuid is zero, do not cancel + if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid || + itr->second.info.process_uuid == 0 || + q.second->get_fe_process_uuid() == 0) { + continue; + } + } + + // Coorninator of this query has already dead. + queries_to_cancel.push_back(q.first); + } } + + // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is + // designed to count canceled fragment of non-pipeline query. timeout_canceled_fragment_count->increment(to_cancel.size()); for (auto& id : to_cancel) { - cancel(id, PPlanFragmentCancelReason::TIMEOUT); + cancel_fragment(id, PPlanFragmentCancelReason::TIMEOUT); LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment " << print_id(id); } + + for (const auto& qid : queries_to_cancel) { + cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR, + std::string("Coordinator dead.")); + } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index cdc01627a0..8548d19d78 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -93,15 +93,28 @@ public: Status start_query_execution(const PExecPlanFragmentStartRequest* request); - void cancel(const TUniqueId& fragment_id) { - cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR); - } - - void cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, - const std::string& msg = ""); - + // This method can only be used to cancel a fragment of non-pipeline query. + void cancel_fragment(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, + const std::string& msg = ""); + void cancel_fragment_unlocked(const TUniqueId& instance_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg = ""); + + // Pipeline version, cancel a fragment instance. + void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, + const std::string& msg = ""); + void cancel_instance_unlocked(const TUniqueId& instance_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg = ""); + + // Can be used in both version. void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, const std::string& msg = ""); + void cancel_query_unlocked(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg = ""); bool query_is_canceled(const TUniqueId& query_id); @@ -132,6 +145,10 @@ public: ThreadPool* get_thread_pool() { return _thread_pool.get(); } private: + void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, bool is_pipeline, + const std::string& msg = ""); + void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor, const FinishCallback& cb); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/frontend_info.h similarity index 83% copy from be/src/runtime/exec_env.cpp copy to be/src/runtime/frontend_info.h index 8f63d6fe6a..c16d63096f 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/frontend_info.h @@ -15,17 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/exec_env.h" - #include <gen_cpp/HeartbeatService_types.h> -namespace doris { +#include <ctime> -ExecEnv::ExecEnv() : _is_init(false) {} +namespace doris { -ExecEnv::~ExecEnv() {} +struct FrontendInfo { + TFrontendInfo info; + std::time_t first_receiving_time_ms; + std::time_t last_reveiving_time_ms; +}; -const std::string& ExecEnv::token() const { - return _master_info->token; -} } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index d53a3731d8..92974c73a2 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -47,7 +47,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( } void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) { - ExecEnv::GetInstance()->fragment_mgr()->cancel( + ExecEnv::GetInstance()->fragment_mgr()->cancel_instance( _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 105ac80b81..7882b21c8d 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -217,6 +217,8 @@ public: return _query_options.be_exec_version; } + [[nodiscard]] int64_t get_fe_process_uuid() const { return _query_options.fe_process_uuid; } + RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } TUniqueId query_id() const { return _query_id; } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index c04091519a..76e22fe084 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -40,6 +40,7 @@ #include "common/factory_creator.h" #include "common/status.h" #include "gutil/integral_types.h" +#include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" @@ -173,7 +174,9 @@ public: _is_cancelled.store(v); // Create a error status, so that we could print error stack, and // we could know which path call cancel. - LOG(INFO) << "task is cancelled, st = " << Status::Error<ErrorCode::CANCELLED>(msg); + LOG(WARNING) << "Task is cancelled, instance: " + << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) + << " st = " << Status::Error<ErrorCode::CANCELLED>(msg); } void set_backend_id(int64_t backend_id) { _backend_id = backend_id; } diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 21c5c7b4f6..90419b0b5f 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -118,7 +118,8 @@ Status BackendService::start_plan_fragment_execution(const TExecPlanFragmentPara void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& params) { LOG(INFO) << "cancel_plan_fragment(): instance_id=" << params.fragment_instance_id; - _exec_env->fragment_mgr()->cancel(params.fragment_instance_id); + _exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id, + PPlanFragmentCancelReason::INTERNAL_ERROR); } void BackendService::transmit_data(TTransmitDataResult& return_val, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index efd2a0af46..5effa721e1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -559,10 +559,11 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* if (request->has_cancel_reason()) { LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) << ", reason: " << PPlanFragmentCancelReason_Name(request->cancel_reason()); - _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason()); + _exec_env->fragment_mgr()->cancel_instance(tid, request->cancel_reason()); } else { LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel(tid); + _exec_env->fragment_mgr()->cancel_instance(tid, + PPlanFragmentCancelReason::INTERNAL_ERROR); } // TODO: the logic seems useless, cancel only return Status::OK. remove it st.to_protobuf(result->mutable_status()); diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp index 9ea88acc25..2d44c281a5 100644 --- a/be/src/util/debug_util.cpp +++ b/be/src/util/debug_util.cpp @@ -17,6 +17,7 @@ #include "util/debug_util.h" +#include <gen_cpp/HeartbeatService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <stdint.h> @@ -26,6 +27,7 @@ #include <utility> #include "common/version_internal.h" +#include "util/uid_util.h" namespace doris { @@ -101,4 +103,35 @@ std::string hexdump(const char* buf, int len) { return ss.str(); } +std::string PrintThriftNetworkAddress(const TNetworkAddress& add) { + std::stringstream ss; + add.printTo(ss); + return ss.str(); +} + +std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos) { + std::stringstream ss; + const size_t count = fe_infos.size(); + + for (int i = 0; i < count; ++i) { + fe_infos[i].printTo(ss); + ss << ' '; + } + + return ss.str(); +} + +std::string PrintFrontendInfo(const TFrontendInfo& fe_info) { + std::stringstream ss; + fe_info.printTo(ss); + + return ss.str(); +} + +std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, const TUniqueId& iid) { + std::stringstream ss; + ss << print_id(iid) << '|' << fid << '|' << print_id(qid); + return ss.str(); +} + } // namespace doris diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h index fbc0c221f4..e6b6491b8a 100644 --- a/be/src/util/debug_util.h +++ b/be/src/util/debug_util.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/HeartbeatService_types.h> #include <gen_cpp/Metrics_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Types_types.h> @@ -30,6 +31,14 @@ std::string print_tstmt_type(const TStmtType::type& type); std::string print_query_state(const QueryState::type& type); std::string PrintTUnit(const TUnit::type& type); std::string PrintTMetricKind(const TMetricKind::type& type); +std::string PrintThriftNetworkAddress(const TNetworkAddress&); +std::string PrintFrontendInfo(const TFrontendInfo& fe_info); +std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos); + +// A desirable scenario would be to call this function WHENEVER whenever we need to print instance information. +// By using a fixed format, we would be able to identify all the paths in which this instance is executed. +// InstanceId|FragmentIdx|QueryId +std::string PrintInstanceStandardInfo(const TUniqueId& qid, const int fid, const TUniqueId& iid); // Returns a string "<product version number> (build <build hash>)" // If compact == false, this string is appended: "\nBuilt on <build time>" diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ab23ca5064..af86bb32ef 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -241,6 +241,7 @@ import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.task.PriorityMasterTaskExecutor; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TCompressionType; +import org.apache.doris.thrift.TFrontendInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector; @@ -477,6 +478,19 @@ public class Env { private HiveTransactionMgr hiveTransactionMgr; + public List<TFrontendInfo> getFrontendInfos() { + List<TFrontendInfo> res = new ArrayList<>(); + + for (Frontend fe : frontends.values()) { + TFrontendInfo feInfo = new TFrontendInfo(); + feInfo.setCoordinatorAddress(new TNetworkAddress(fe.getHost(), fe.getRpcPort())); + feInfo.setProcessUuid(fe.getProcessUUID()); + res.add(feInfo); + } + + return res; + } + public List<Frontend> getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7a1278266c..1c97dadf3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -72,6 +72,7 @@ import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -399,6 +400,7 @@ public class Coordinator { this.queryOptions.setQueryTimeout(context.getExecTimeout()); this.queryOptions.setExecutionTimeout(context.getExecTimeout()); this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial()); + this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); } public ConnectContext getConnectContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java index f7ba622b7e..7b9721464e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/ExecuteEnv.java @@ -30,14 +30,14 @@ public class ExecuteEnv { private static volatile ExecuteEnv INSTANCE; private MultiLoadMgr multiLoadMgr; private ConnectScheduler scheduler; - private long startupTime; + private long processUUID; private List<FeDiskInfo> diskInfos; private ExecuteEnv() { multiLoadMgr = new MultiLoadMgr(); scheduler = new ConnectScheduler(Config.qe_max_connection); - startupTime = System.currentTimeMillis(); + processUUID = System.currentTimeMillis(); diskInfos = new ArrayList<FeDiskInfo>() {{ add(new FeDiskInfo("meta", Config.meta_dir, DiskUtils.df(Config.meta_dir))); add(new FeDiskInfo("log", Config.sys_log_dir, DiskUtils.df(Config.sys_log_dir))); @@ -65,8 +65,8 @@ public class ExecuteEnv { return multiLoadMgr; } - public long getStartupTime() { - return startupTime; + public long getProcessUUID() { + return processUUID; } public List<FeDiskInfo> getDiskInfos() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index db7514a3be..dfb88c7610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1933,6 +1933,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } txnState.addTableIndexes(table); plan.setTableName(table.getName()); + plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); return plan; } finally { table.readUnlock(); @@ -2033,7 +2034,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setQueryPort(Config.query_port); result.setRpcPort(Config.rpc_port); result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); - result.setLastStartupTime(exeEnv.getStartupTime()); + result.setLastStartupTime(exeEnv.getProcessUUID()); if (exeEnv.getDiskInfos() != null) { result.setDiskInfos(FeDiskInfo.toThrifts(exeEnv.getDiskInfos())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 35964f2b2a..4f16fa1629 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -60,6 +60,8 @@ public class Frontend implements Writable { private boolean isAlive = false; + private long processUUID = 0; + public Frontend() { } @@ -122,6 +124,10 @@ public class Frontend implements Writable { return lastStartupTime; } + public long getProcessUUID() { + return processUUID; + } + public long getLastUpdateTime() { return lastUpdateTime; } @@ -150,10 +156,16 @@ public class Frontend implements Writable { replayedJournalId = hbResponse.getReplayedJournalId(); lastUpdateTime = hbResponse.getHbTime(); heartbeatErrMsg = ""; - lastStartupTime = hbResponse.getFeStartTime(); + lastStartupTime = hbResponse.getProcessUUID(); diskInfos = hbResponse.getDiskInfos(); isChanged = true; + processUUID = lastStartupTime; } else { + // A non-master node disconnected. + // Set startUUID to zero, and be's heartbeat mgr will ignore this hb, + // so that its cancel worker will not cancel queries from this fe immediately + // until it receives a valid start UUID. + processUUID = 0; if (isAlive) { isAlive = false; isChanged = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java index 8ad4f36e6d..54b9344ac2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java @@ -19,6 +19,7 @@ package org.apache.doris.system; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FeDiskInfo; import com.google.gson.annotations.SerializedName; @@ -41,7 +42,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { @SerializedName(value = "replayedJournalId") private long replayedJournalId; private String version; - private long feStartTime; + private long processUUID; private List<FeDiskInfo> diskInfos; public FrontendHbResponse() { @@ -50,7 +51,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { public FrontendHbResponse(String name, int queryPort, int rpcPort, long replayedJournalId, long hbTime, String version, - long feStartTime, List<FeDiskInfo> diskInfos) { + long processUUID, List<FeDiskInfo> diskInfos) { super(HeartbeatResponse.Type.FRONTEND); this.status = HbStatus.OK; this.name = name; @@ -59,7 +60,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { this.replayedJournalId = replayedJournalId; this.hbTime = hbTime; this.version = version; - this.feStartTime = feStartTime; + this.processUUID = processUUID; this.diskInfos = diskInfos; } @@ -68,6 +69,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { this.status = HbStatus.BAD; this.name = name; this.msg = errMsg; + this.processUUID = ExecuteEnv.getInstance().getProcessUUID(); } public String getName() { @@ -90,8 +92,8 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { return version; } - public long getFeStartTime() { - return feStartTime; + public long getProcessUUID() { + return processUUID; } public List<FeDiskInfo> getDiskInfos() { @@ -116,7 +118,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { sb.append(", queryPort: ").append(queryPort); sb.append(", rpcPort: ").append(rpcPort); sb.append(", replayedJournalId: ").append(replayedJournalId); - sb.append(", festartTime: ").append(feStartTime); + sb.append(", festartTime: ").append(processUUID); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 6cfcff34e0..87c7073142 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPingBrokerRequest; import org.apache.doris.thrift.TBrokerVersion; +import org.apache.doris.thrift.TFrontendInfo; import org.apache.doris.thrift.TFrontendPingFrontendRequest; import org.apache.doris.thrift.TFrontendPingFrontendResult; import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; @@ -101,11 +102,12 @@ public class HeartbeatMgr extends MasterDaemon { */ @Override protected void runAfterCatalogReady() { + // Get feInfos of previous iteration. + List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos(); List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList(); - // send backend heartbeat for (Backend backend : nodeMgr.getIdToBackend().values()) { - BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend); + BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos); hbResponses.add(executor.submit(handler)); } @@ -204,9 +206,11 @@ public class HeartbeatMgr extends MasterDaemon { // backend heartbeat private class BackendHeartbeatHandler implements Callable<HeartbeatResponse> { private Backend backend; + private List<TFrontendInfo> feInfos; - public BackendHeartbeatHandler(Backend backend) { + public BackendHeartbeatHandler(Backend backend, List<TFrontendInfo> feInfos) { this.backend = backend; + this.feInfos = feInfos; } @Override @@ -222,6 +226,7 @@ public class HeartbeatMgr extends MasterDaemon { long flags = heartbeatFlags.getHeartbeatFlags(); copiedMasterInfo.setHeartbeatFlags(flags); copiedMasterInfo.setBackendId(backendId); + copiedMasterInfo.setFrontendInfos(feInfos); THeartbeatResult result; if (!FeConstants.runningUnitTest) { client = ClientPool.backendHeartbeatPool.borrowObject(beAddr); @@ -301,7 +306,7 @@ public class HeartbeatMgr extends MasterDaemon { return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port, Env.getCurrentEnv().getMaxJournalId(), System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH, - ExecuteEnv.getInstance().getStartupTime(), ExecuteEnv.getInstance().getDiskInfos()); + ExecuteEnv.getInstance().getProcessUUID(), ExecuteEnv.getInstance().getDiskInfos()); } else { return new FrontendHbResponse(fe.getNodeName(), "not ready"); } diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 38ea17e907..8361fcbf61 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -33,6 +33,7 @@ struct TMasterInfo { 6: optional Types.TPort http_port 7: optional i64 heartbeat_flags 8: optional i64 backend_id + 9: optional list<TFrontendInfo> frontend_infos } struct TBackendInfo { @@ -53,3 +54,8 @@ struct THeartbeatResult { service HeartbeatService { THeartbeatResult heartbeat(1:TMasterInfo master_info); } + +struct TFrontendInfo { + 1: optional Types.TNetworkAddress coordinator_address + 2: optional i64 process_uuid +} \ No newline at end of file diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index a87238dfff..2f20057840 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -237,6 +237,9 @@ struct TQueryOptions { 80: optional bool enable_memtable_on_sink_node = false; 81: optional bool enable_delete_sub_predicate_v2 = false; + + // A tag used to distinguish fe start epoch. + 82: optional i64 fe_process_uuid = 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org