This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b0f22f044c1 [opt](query cancel) cancel query if it has pipeline task leakage (#39223) b0f22f044c1 is described below commit b0f22f044c15cd7b95d1bcb52fde2fa7f6a4e686 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Sat Aug 17 16:40:48 2024 +0800 [opt](query cancel) cancel query if it has pipeline task leakage (#39223) * Problem We are currently facing an issue where pipeline tasks experience leaks in certain situations. The leak in pipeline tasks refers to the scenario where a query has already been completed, but its associated data structures still persist on the backend (BE). This could lead to some memory or computational resources on the BE never being released. * Fix We will periodically reconcile queries with the Frontend (FE) in the cancel work thread. Once we detect that a query has been completed on the FE but still exists on the Backend (BE), we will cancel the query to promptly release the resources. To avoid mistakenly triggering cancellations, we employ a conservative strategy. For instance, we will not proactively cancel queries if we detect any FE is in an abnormal state or if there are network conflicts. --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/io/fs/multi_table_pipe.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 6 + be/src/runtime/fragment_mgr.cpp | 179 +++++++++++++++++++-- be/src/runtime/fragment_mgr.h | 12 +- be/src/runtime/frontend_info.h | 6 + be/src/runtime/group_commit_mgr.cpp | 3 +- be/src/runtime/query_context.cpp | 25 ++- be/src/runtime/query_context.h | 19 ++- .../runtime/stream_load/stream_load_executor.cpp | 5 +- be/src/service/backend_service.cpp | 3 +- be/src/service/internal_service.cpp | 18 ++- .../apache/doris/service/FrontendServiceImpl.java | 21 +++ gensrc/thrift/FrontendService.thrift | 11 ++ 15 files changed, 279 insertions(+), 35 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a671bbea708..78805a58ac6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1330,6 +1330,8 @@ DEFINE_mBool(enable_hdfs_mem_limiter, "true"); DEFINE_mInt16(topn_agg_limit_multiplier, "2"); +DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 73d2c24b04d..aeaeea30ae8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1427,6 +1427,8 @@ DECLARE_mBool(enable_hdfs_mem_limiter); // we should do agg limit opt DECLARE_mInt16(topn_agg_limit_multiplier); +DECLARE_mInt64(pipeline_task_leakage_detect_period_secs); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index d7fdd8a738b..357abee9d0f 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -251,7 +251,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _inflight_cnt++; RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment( - plan, [this, plan](RuntimeState* state, Status* status) { + plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* state, Status* status) { DCHECK(state); auto pair = _planned_tables.find(plan.table_name); if (pair == _planned_tables.end()) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 928191562b1..c3942e8286e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -176,6 +176,12 @@ void PipelineFragmentContext::cancel(const Status reason) { if (reason.is<ErrorCode::TIMEOUT>()) { LOG(WARNING) << "PipelineFragmentContext is cancelled due to timeout : " << debug_string(); } + + if (reason.is<ErrorCode::ILLEGAL_STATE>()) { + LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", + this->debug_string()); + } + _query_ctx->cancel(reason, _fragment_id); if (reason.is<ErrorCode::LIMIT_REACH>()) { _is_report_on_cancel = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 13c33e79c9a..2e69bb34152 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -17,6 +17,7 @@ #include "runtime/fragment_mgr.h" +#include <bits/types/struct_timespec.h> #include <bvar/latency_recorder.h> #include <exprs/runtime_filter.h> #include <fmt/format.h> @@ -34,6 +35,7 @@ #include <gen_cpp/internal_service.pb.h> #include <pthread.h> #include <stddef.h> +#include <thrift/TApplicationException.h> #include <thrift/Thrift.h> #include <thrift/protocol/TDebugProtocol.h> #include <thrift/transport/TTransportException.h> @@ -45,11 +47,13 @@ #include "common/status.h" // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep +#include <cstdint> #include <map> #include <memory> #include <mutex> #include <sstream> #include <unordered_map> +#include <unordered_set> #include <utility> #include "cloud/config.h" @@ -133,6 +137,103 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, + std::unordered_set<TUniqueId>& query_set) { + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + const int32 timeout_ms = 3 * 1000; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_info.info.coordinator_address, timeout_ms, + &client_status); + // Abort this fe. + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + return Status::InternalError("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return Status::InternalError("Reopen failed, reason: {}", + client_status.to_string_no_stack()); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + // During upgrading cluster or meet any other network error. + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what()); + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + e.what()); + } + + // Avoid logic error in frontend. + if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + } + + if (rpc_result.__isset.running_queries == false) { + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + "running_queries is not set"); + } + + query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(), + rpc_result.running_queries.end()); + return Status::OK(); +}; + +static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::map<int64_t, std::unordered_set<TUniqueId>> result; + std::vector<FrontendInfo> qualified_fes; + + for (const auto& fe : running_fes) { + // Only consider normal frontend. + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.second); + } else { + return {}; + } + } + + for (const auto& fe_addr : qualified_fes) { + const int64_t process_uuid = fe_addr.info.process_uuid; + std::unordered_set<TUniqueId> query_set; + Status st = _do_fetch_running_queries_rpc(fe_addr, query_set); + if (!st.ok()) { + // Empty result, cancel worker will not do anything + return {}; + } + + // frontend_info and process_uuid has been checked in rpc threads. + result[process_uuid] = query_set; + } + + return result; +} + FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); @@ -467,11 +568,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { static void empty_function(RuntimeState*, Status*) {} -Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { +Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, + const QuerySource query_source) { return Status::InternalError("Non-pipeline is disabled!"); } -Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) { +Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, + const QuerySource query_source) { if (params.txn_conf.need_txn) { std::shared_ptr<StreamLoadContext> stream_load_ctx = std::make_shared<StreamLoadContext>(_exec_env); @@ -503,7 +606,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) { RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); return Status::OK(); } else { - return exec_plan_fragment(params, empty_function); + return exec_plan_fragment(params, query_source, empty_function); } } @@ -566,6 +669,7 @@ std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock( template <typename Params> Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, + QuerySource query_source, std::shared_ptr<QueryContext>& query_ctx) { DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); @@ -597,9 +701,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo // This may be a first fragment request of the query. // Create the query fragments context. - query_ctx = - QueryContext::create_shared(query_id, _exec_env, params.query_options, params.coord, - pipeline, params.is_nereids, params.current_connect_fe); + query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, + params.coord, pipeline, params.is_nereids, + params.current_connect_fe, query_source); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); @@ -648,7 +752,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, - const FinishCallback& cb) { + QuerySource query_source, const FinishCallback& cb) { return Status::InternalError("Non-pipeline is disabled!"); } @@ -690,7 +794,7 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { } Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, - const FinishCallback& cb) { + QuerySource query_source, const FinishCallback& cb) { VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog @@ -699,7 +803,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr<QueryContext> query_ctx; - RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx)); + RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx)); SCOPED_ATTACH_TASK(query_ctx.get()); int64_t duration_ns = 0; std::shared_ptr<pipeline::PipelineFragmentContext> context = @@ -830,12 +934,30 @@ void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reas void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; + + timespec check_invalid_query_last_timestamp; + clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp); + do { std::vector<TUniqueId> queries_lost_coordinator; std::vector<TUniqueId> queries_timeout; + std::vector<TUniqueId> queries_pipeline_task_leak; + // Fe process uuid -> set<QueryId> + std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes; + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + + if (now.tv_sec - check_invalid_query_last_timestamp.tv_sec > + config::pipeline_task_leakage_detect_period_secs) { + check_invalid_query_last_timestamp = now; + running_queries_on_all_fes = _get_all_running_queries_from_fe(); + } else { + running_queries_on_all_fes.clear(); + } + { std::lock_guard<std::mutex> lock(_lock); for (auto& pipeline_itr : _pipeline_map) { @@ -855,8 +977,6 @@ void FragmentMgr::cancel_worker() { } } - const auto& running_fes = ExecEnv::GetInstance()->get_running_frontends(); - // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -870,15 +990,35 @@ void FragmentMgr::cancel_worker() { } else { for (const auto& it : _query_ctx_map) { if (auto q_ctx = it.second.lock()) { - if (q_ctx->get_fe_process_uuid() == 0) { + const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); + + if (fe_process_uuid == 0) { // zero means this query is from a older version fe or // this fe is starting continue; } + // If the query is not running on the any frontends, cancel it. + if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); + itr != running_queries_on_all_fes.end()) { + // Query not found on this frontend, and the query arrives before the last check + if (itr->second.find(it.first) == itr->second.end() && + q_ctx->get_query_arrival_timestamp().tv_nsec < + check_invalid_query_last_timestamp.tv_nsec && + q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { + queries_pipeline_task_leak.push_back(q_ctx->query_id()); + LOG_INFO( + "Query {}, type {} is not found on any frontends, maybe it " + "is leaked.", + print_id(q_ctx->query_id()), + toString(q_ctx->get_query_source())); + continue; + } + } + auto itr = running_fes.find(q_ctx->coord_addr); if (itr != running_fes.end()) { - if (q_ctx->get_fe_process_uuid() == itr->second.info.process_uuid || + if (fe_process_uuid == itr->second.info.process_uuid || itr->second.info.process_uuid == 0) { continue; } else { @@ -932,9 +1072,18 @@ void FragmentMgr::cancel_worker() { "FragmentMgr cancel worker going to cancel timeout instance ")); } + for (const auto& qid : queries_pipeline_task_leak) { + // Cancel the query, and maybe try to report debug info to fe so that we can + // collect debug info by sql or http api instead of search log. + cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>( + "Potential pipeline task leakage")); + } + for (const auto& qid : queries_lost_coordinator) { - cancel_query(qid, Status::InternalError("Coordinator dead.")); + cancel_query(qid, Status::Error<ErrorCode::CANCELLED>( + "Source frontend is not running or restarted")); } + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } @@ -1032,7 +1181,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, exec_fragment_params.__set_query_options(query_options); VLOG_ROW << "external exec_plan_fragment params is " << apache::thrift::ThriftDebugString(exec_fragment_params).c_str(); - return exec_plan_fragment(exec_fragment_params); + return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR); } Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index cb32bc5e77e..bc066066f7b 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -81,17 +81,19 @@ public: void stop(); // execute one plan fragment - Status exec_plan_fragment(const TExecPlanFragmentParams& params); + Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type); - Status exec_plan_fragment(const TPipelineFragmentParams& params); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type); void remove_pipeline_context( std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context); // TODO(zc): report this is over - Status exec_plan_fragment(const TExecPlanFragmentParams& params, const FinishCallback& cb); + Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type, + const FinishCallback& cb); - Status exec_plan_fragment(const TPipelineFragmentParams& params, const FinishCallback& cb); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, + const FinishCallback& cb); Status start_query_execution(const PExecPlanFragmentStartRequest* request); @@ -155,7 +157,7 @@ private: template <typename Params> Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - std::shared_ptr<QueryContext>& query_ctx); + QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx); // This is input params ExecEnv* _exec_env = nullptr; diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h index a7e4b3f999b..c4d3d710b3c 100644 --- a/be/src/runtime/frontend_info.h +++ b/be/src/runtime/frontend_info.h @@ -17,6 +17,7 @@ #pragma once #include <gen_cpp/HeartbeatService_types.h> +#include <gen_cpp/Types_types.h> #include <ctime> @@ -28,4 +29,9 @@ struct FrontendInfo { std::time_t last_reveiving_time_ms; }; +struct FrontendAddrAndRunningQueries { + TNetworkAddress frontend_addr; + std::set<TUniqueId> running_queries; +}; + } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 2383f25afc8..3250379cf85 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -582,7 +582,8 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, << ", st=" << finish_st.to_string(); } }; - return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); + return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, + QuerySource::GROUP_COMMIT_LOAD, finish_cb); } Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 04bcb16cc8a..f10a0a5edca 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -55,15 +55,34 @@ public: std::unique_ptr<ThreadPoolToken> token_; }; +const std::string toString(QuerySource queryType) { + switch (queryType) { + case QuerySource::INTERNAL_FRONTEND: + return "INTERNAL_FRONTEND"; + case QuerySource::STREAM_LOAD: + return "STREAM_LOAD"; + case QuerySource::GROUP_COMMIT_LOAD: + return "EXTERNAL_QUERY"; + case QuerySource::ROUTINE_LOAD: + return "ROUTINE_LOAD"; + case QuerySource::EXTERNAL_CONNECTOR: + return "EXTERNAL_CONNECTOR"; + default: + return "UNKNOWN"; + } +} + QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, - bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe) + bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe, + QuerySource query_source) : _timeout_second(-1), _query_id(query_id), _exec_env(exec_env), _is_pipeline(is_pipeline), _is_nereids(is_nereids), - _query_options(query_options) { + _query_options(query_options), + _query_source(query_source) { _init_query_mem_tracker(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker); _query_watcher.start(); @@ -89,7 +108,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0; DCHECK_EQ(is_report_fe_addr_valid, true); } - + clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp); register_memory_statistics(); register_cpu_statistics(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 3241010c20e..006305bf599 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -63,6 +63,16 @@ struct ReportStatusRequest { std::function<void(const Status&)> cancel_fn; }; +enum class QuerySource { + INTERNAL_FRONTEND, + STREAM_LOAD, + GROUP_COMMIT_LOAD, + ROUTINE_LOAD, + EXTERNAL_CONNECTOR +}; + +const std::string toString(QuerySource query_source); + // Save the common components of fragments in a query. // Some components like DescriptorTbl may be very large // that will slow down each execution of fragments when DeSer them every time. @@ -73,7 +83,7 @@ class QueryContext { public: QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids, - TNetworkAddress current_connect_fe); + TNetworkAddress current_connect_fe, QuerySource query_type); ~QueryContext(); @@ -310,6 +320,10 @@ private: std::atomic<int64_t> _spill_threshold {0}; std::mutex _profile_mutex; + timespec _query_arrival_timestamp; + // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to + // help us manage the query. + QuerySource _query_source; // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile // flatten profile of one fragment: @@ -348,6 +362,9 @@ public: bool enable_profile() const { return _query_options.__isset.enable_profile && _query_options.enable_profile; } + + timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } + QuerySource get_query_source() const { return this->_query_source; } }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 4ddd29ac9c3..4b0788186a0 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -147,10 +147,11 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); if (ctx->put_result.__isset.params) { - st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, + QuerySource::STREAM_LOAD, exec_fragment); } else { st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, - exec_fragment); + QuerySource::STREAM_LOAD, exec_fragment); } if (!st.ok()) { diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 9b63439a634..aa29661da02 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -653,7 +653,8 @@ Status BaseBackendService::start_plan_fragment_execution( if (!exec_params.fragment.__isset.output_sink) { return Status::InternalError("missing sink in plan fragment"); } - return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params); + return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params, + QuerySource::INTERNAL_FRONTEND); } void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2492d2a846b..c2251c240ae 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -514,9 +514,11 @@ Status PInternalService::_exec_plan_fragment_impl( RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); } if (cb) { - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, cb); + return _exec_env->fragment_mgr()->exec_plan_fragment( + t_request, QuerySource::INTERNAL_FRONTEND, cb); } else { - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, + QuerySource::INTERNAL_FRONTEND); } } else if (version == PFragmentRequestVersion::VERSION_2) { TExecPlanFragmentParamsList t_request; @@ -531,9 +533,11 @@ Status PInternalService::_exec_plan_fragment_impl( for (const TExecPlanFragmentParams& params : t_request.paramsList) { if (cb) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + params, QuerySource::INTERNAL_FRONTEND, cb)); } else { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + params, QuerySource::INTERNAL_FRONTEND)); } } @@ -562,9 +566,11 @@ Status PInternalService::_exec_plan_fragment_impl( timer.start(); for (const TPipelineFragmentParams& fragment : fragment_list) { if (cb) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment, cb)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + fragment, QuerySource::INTERNAL_FRONTEND, cb)); } else { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + fragment, QuerySource::INTERNAL_FRONTEND)); } } timer.stop(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1e6d8e987c1..f64f55a47fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -143,6 +143,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest; import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; +import org.apache.doris.thrift.TFetchRunningQueriesRequest; +import org.apache.doris.thrift.TFetchRunningQueriesResult; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TFetchSplitBatchRequest; @@ -3982,4 +3984,23 @@ public class FrontendServiceImpl implements FrontendService.Iface { return new TStatus(TStatusCode.OK); } + @Override + public TFetchRunningQueriesResult fetchRunningQueries(TFetchRunningQueriesRequest request) { + TFetchRunningQueriesResult result = new TFetchRunningQueriesResult(); + if (!Env.getCurrentEnv().isReady()) { + result.setStatus(new TStatus(TStatusCode.ILLEGAL_STATE)); + return result; + } + + List<TUniqueId> runningQueries = Lists.newArrayList(); + List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators(); + + for (Coordinator coordinator : allCoordinators) { + runningQueries.add(coordinator.getQueryId()); + } + + result.setStatus(new TStatus(TStatusCode.OK)); + result.setRunningQueries(runningQueries); + return result; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a3d2ad26967..b6e4aacf656 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -30,6 +30,7 @@ include "RuntimeProfile.thrift" include "MasterService.thrift" include "AgentService.thrift" include "DataSinks.thrift" +include "HeartbeatService.thrift" // These are supporting structs for JniFrontend.java, which serves as the glue // between our C++ execution environment and the Java frontend. @@ -1561,6 +1562,14 @@ struct TFetchSplitBatchResult { 2: optional Status.TStatus status } +struct TFetchRunningQueriesResult { + 1: optional Status.TStatus status + 2: optional list<Types.TUniqueId> running_queries +} + +struct TFetchRunningQueriesRequest { +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1655,4 +1664,6 @@ service FrontendService { TFetchSplitBatchResult fetchSplitBatch(1: TFetchSplitBatchRequest request) Status.TStatus updatePartitionStatsCache(1: TUpdateFollowerPartitionStatsCacheRequest request) + + TFetchRunningQueriesResult fetchRunningQueries(1: TFetchRunningQueriesRequest request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org