This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 830f250a804 [opt](query cancel) cancel query if it has pipeline task leakage #39223 (#39537) 830f250a804 is described below commit 830f250a804ba292474a2838e12892b54c595324 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Mon Aug 19 14:33:59 2024 +0800 [opt](query cancel) cancel query if it has pipeline task leakage #39223 (#39537) pick #39223 with some modifications. Optimization will only be applied to pipeline x. --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/io/fs/multi_table_pipe.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 9 + be/src/runtime/fragment_mgr.cpp | 188 +++++++++++++++++++-- be/src/runtime/fragment_mgr.h | 12 +- be/src/runtime/frontend_info.h | 6 + be/src/runtime/group_commit_mgr.cpp | 6 +- be/src/runtime/query_context.cpp | 25 ++- be/src/runtime/query_context.h | 20 ++- .../runtime/stream_load/stream_load_executor.cpp | 5 +- be/src/service/backend_service.cpp | 3 +- be/src/service/internal_service.cpp | 18 +- .../apache/doris/service/FrontendServiceImpl.java | 23 +++ gensrc/thrift/FrontendService.thrift | 11 ++ 15 files changed, 295 insertions(+), 37 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index fe03cc9158f..461e328044d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1270,6 +1270,8 @@ DEFINE_mBool(enable_parquet_page_index, "false"); DEFINE_mBool(ignore_not_found_file_in_external_table, "true"); +DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index afcdc62cb78..570fb61c5f3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1358,6 +1358,8 @@ DECLARE_mBool(enable_parquet_page_index); // Default is true, if set to false, the not found file will result in query failure. DECLARE_mBool(ignore_not_found_file_in_external_table); +DECLARE_mInt64(pipeline_task_leakage_detect_period_secs); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 4469174211e..75fd05f2ef8 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -249,7 +249,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _inflight_cnt++; RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment( - plan, [this, plan](RuntimeState* state, Status* status) { + plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* state, Status* status) { DCHECK(state); auto pair = _planned_tables.find(plan.table_name); if (pair == _planned_tables.end()) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 236c2790ccf..df7cd2ece28 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -21,6 +21,7 @@ #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Planner_types.h> +#include <gen_cpp/types.pb.h> #include <pthread.h> #include <cstdlib> @@ -184,6 +185,14 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // make result receiver on fe be stocked on rpc forever until timeout... // We need a more detail discussion. _query_ctx->cancel(msg, Status::Cancelled(msg)); + + if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) { + if (msg.find("Pipeline task leak.") != std::string::npos) { + LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", + this->debug_string()); + } + } + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { _is_report_on_cancel = false; } else { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c61bb82df75..74f42a28b8a 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -17,6 +17,7 @@ #include "runtime/fragment_mgr.h" +#include <bits/types/struct_timespec.h> #include <bvar/latency_recorder.h> #include <exprs/runtime_filter.h> #include <fmt/format.h> @@ -31,8 +32,10 @@ #include <gen_cpp/QueryPlanExtra_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/types.pb.h> #include <pthread.h> #include <stddef.h> +#include <thrift/TApplicationException.h> #include <thrift/Thrift.h> #include <thrift/protocol/TDebugProtocol.h> #include <thrift/transport/TTransportException.h> @@ -44,10 +47,13 @@ #include "pipeline/pipeline_x/pipeline_x_fragment_context.h" // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep +#include <cstdint> #include <map> #include <memory> #include <mutex> #include <sstream> +#include <unordered_map> +#include <unordered_set> #include <utility> #include "common/config.h" @@ -115,6 +121,103 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; +static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, + std::unordered_set<TUniqueId>& query_set) { + TFetchRunningQueriesResult rpc_result; + TFetchRunningQueriesRequest rpc_request; + + Status client_status; + const int32 timeout_ms = 3 * 1000; + FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), + fe_info.info.coordinator_address, timeout_ms, + &client_status); + // Abort this fe. + if (!client_status.ok()) { + LOG_WARNING("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + return Status::InternalError("Failed to get client for {}, reason is {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + client_status.to_string()); + } + + // do rpc + try { + try { + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } catch (const apache::thrift::transport::TTransportException& e) { + LOG_WARNING("Transport exception reason: {}, reopening", e.what()); + client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); + if (!client_status.ok()) { + LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack()); + return Status::InternalError("Reopen failed, reason: {}", + client_status.to_string_no_stack()); + } + + rpc_client->fetchRunningQueries(rpc_result, rpc_request); + } + } catch (apache::thrift::TException& e) { + // During upgrading cluster or meet any other network error. + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what()); + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + e.what()); + } + + // Avoid logic error in frontend. + if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) { + LOG_WARNING("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + doris::to_string(rpc_result.status.status_code)); + } + + if (rpc_result.__isset.running_queries == false) { + return Status::InternalError("Failed to fetch running queries from {}, reason: {}", + PrintThriftNetworkAddress(fe_info.info.coordinator_address), + "running_queries is not set"); + } + + query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(), + rpc_result.running_queries.end()); + return Status::OK(); +}; + +static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() { + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + std::map<int64_t, std::unordered_set<TUniqueId>> result; + std::vector<FrontendInfo> qualified_fes; + + for (const auto& fe : running_fes) { + // Only consider normal frontend. + if (fe.first.port != 0 && fe.second.info.process_uuid != 0) { + qualified_fes.push_back(fe.second); + } else { + return {}; + } + } + + for (const auto& fe_addr : qualified_fes) { + const int64_t process_uuid = fe_addr.info.process_uuid; + std::unordered_set<TUniqueId> query_set; + Status st = _do_fetch_running_queries_rpc(fe_addr, query_set); + if (!st.ok()) { + // Empty result, cancel worker will not do anything + return {}; + } + + // frontend_info and process_uuid has been checked in rpc threads. + result[process_uuid] = query_set; + } + + return result; +} + FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); @@ -506,7 +609,8 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex cb(fragment_executor->runtime_state(), &status); } -Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { +Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, + const QuerySource query_source) { if (params.txn_conf.need_txn) { std::shared_ptr<StreamLoadContext> stream_load_ctx = std::make_shared<StreamLoadContext>(_exec_env); @@ -539,11 +643,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); return Status::OK(); } else { - return exec_plan_fragment(params, empty_function); + return exec_plan_fragment(params, query_source, empty_function); } } -Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) { +Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, + const QuerySource query_source) { if (params.txn_conf.need_txn) { std::shared_ptr<StreamLoadContext> stream_load_ctx = std::make_shared<StreamLoadContext>(_exec_env); @@ -575,7 +680,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) { RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); return Status::OK(); } else { - return exec_plan_fragment(params, empty_function); + return exec_plan_fragment(params, query_source, empty_function); } } @@ -619,6 +724,7 @@ void FragmentMgr::remove_pipeline_context( template <typename Params> Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, + QuerySource query_source, std::shared_ptr<QueryContext>& query_ctx) { DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); @@ -660,9 +766,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo // This may be a first fragment request of the query. // Create the query fragments context. - query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env, - params.query_options, params.coord, pipeline, - params.is_nereids, current_connect_fe_addr); + query_ctx = QueryContext::create_shared( + query_id, params.fragment_num_on_host, _exec_env, params.query_options, + params.coord, pipeline, params.is_nereids, current_connect_fe_addr, query_source); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); @@ -712,7 +818,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, - const FinishCallback& cb) { + QuerySource query_source, const FinishCallback& cb) { VLOG_ROW << "exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog @@ -734,8 +840,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine && params.query_options.enable_pipeline_engine; - RETURN_IF_ERROR( - _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx)); + RETURN_IF_ERROR(_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, + query_source, query_ctx)); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); { // Need lock here, because it will modify fragment ids and std::vector may resize and reallocate @@ -830,7 +936,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { } Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, - const FinishCallback& cb) { + QuerySource query_source, const FinishCallback& cb) { VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog @@ -839,7 +945,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr<QueryContext> query_ctx; - RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx)); + RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx)); SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id); const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine && params.query_options.enable_pipeline_x_engine; @@ -1125,9 +1231,30 @@ void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; + + timespec check_invalid_query_last_timestamp; + clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp); + do { std::vector<TUniqueId> to_cancel; std::vector<TUniqueId> queries_to_cancel; + std::vector<TUniqueId> queries_pipeline_task_leak; + // Fe process uuid -> set<QueryId> + std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes; + const std::map<TNetworkAddress, FrontendInfo>& running_fes = + ExecEnv::GetInstance()->get_running_frontends(); + + timespec now_for_check_invalid_query; + clock_gettime(CLOCK_MONOTONIC, &now_for_check_invalid_query); + + if (now_for_check_invalid_query.tv_sec - check_invalid_query_last_timestamp.tv_sec > + config::pipeline_task_leakage_detect_period_secs) { + check_invalid_query_last_timestamp = now_for_check_invalid_query; + running_queries_on_all_fes = _get_all_running_queries_from_fe(); + } else { + running_queries_on_all_fes.clear(); + } + VecDateTimeValue now = VecDateTimeValue::local_time(); { std::lock_guard<std::mutex> lock(_lock); @@ -1157,8 +1284,6 @@ void FragmentMgr::cancel_worker() { } } - const auto& running_fes = ExecEnv::GetInstance()->get_running_frontends(); - // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1171,12 +1296,35 @@ void FragmentMgr::cancel_worker() { << "We will not cancel any outdated queries in this situation."; } else { for (const auto& q : _query_ctx_map) { - if (q.second->get_fe_process_uuid() == 0) { + auto q_ctx = q.second; + const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid(); + + if (fe_process_uuid == 0) { // zero means this query is from a older version fe or // this fe is starting continue; } + // If the query is not running on the any frontends, cancel it. + if (auto itr = running_queries_on_all_fes.find(fe_process_uuid); + itr != running_queries_on_all_fes.end()) { + // Query not found on this frontend, and the query arrives before the last check + if (itr->second.find(q_ctx->query_id()) == itr->second.end() && + q_ctx->get_query_arrival_timestamp().tv_nsec < + check_invalid_query_last_timestamp.tv_nsec && + q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) { + if (q_ctx->enable_pipeline_x_exec()) { + queries_pipeline_task_leak.push_back(q_ctx->query_id()); + LOG_INFO( + "Query {}, type {} is not found on any frontends, maybe it " + "is leaked.", + print_id(q_ctx->query_id()), + toString(q_ctx->get_query_source())); + continue; + } + } + } + auto query_context = q.second; auto itr = running_fes.find(query_context->coord_addr); @@ -1236,6 +1384,13 @@ void FragmentMgr::cancel_worker() { << print_id(id); } + for (const auto& qid : queries_pipeline_task_leak) { + // Cancel the query, and maybe try to report debug info to fe so that we can + // collect debug info by sql or http api instead of search log. + cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR, + std::string("Pipeline task leak.")); + } + if (!queries_to_cancel.empty()) { LOG(INFO) << "There are " << queries_to_cancel.size() << " queries need to be cancelled, coordinator dead or restarted."; @@ -1245,6 +1400,7 @@ void FragmentMgr::cancel_worker() { cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR, std::string("Coordinator dead.")); } + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } @@ -1377,7 +1533,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, exec_fragment_params.__set_query_options(query_options); VLOG_ROW << "external exec_plan_fragment params is " << apache::thrift::ThriftDebugString(exec_fragment_params).c_str(); - return exec_plan_fragment(exec_fragment_params); + return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR); } Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 16da4826165..608ee522bad 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -79,17 +79,19 @@ public: void stop(); // execute one plan fragment - Status exec_plan_fragment(const TExecPlanFragmentParams& params); + Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type); - Status exec_plan_fragment(const TPipelineFragmentParams& params); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type); void remove_pipeline_context( std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context); // TODO(zc): report this is over - Status exec_plan_fragment(const TExecPlanFragmentParams& params, const FinishCallback& cb); + Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type, + const FinishCallback& cb); - Status exec_plan_fragment(const TPipelineFragmentParams& params, const FinishCallback& cb); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, + const FinishCallback& cb); Status start_query_execution(const PExecPlanFragmentStartRequest* request); @@ -172,7 +174,7 @@ private: template <typename Params> Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - std::shared_ptr<QueryContext>& query_ctx); + QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx); // This is input params ExecEnv* _exec_env = nullptr; diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h index a7e4b3f999b..c4d3d710b3c 100644 --- a/be/src/runtime/frontend_info.h +++ b/be/src/runtime/frontend_info.h @@ -17,6 +17,7 @@ #pragma once #include <gen_cpp/HeartbeatService_types.h> +#include <gen_cpp/Types_types.h> #include <ctime> @@ -28,4 +29,9 @@ struct FrontendInfo { std::time_t last_reveiving_time_ms; }; +struct FrontendAddrAndRunningQueries { + TNetworkAddress frontend_addr; + std::set<TUniqueId> running_queries; +}; + } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index a5bf52d2ca7..86030664b42 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -535,9 +535,11 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, } }; if (is_pipeline) { - return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); + return _exec_env->fragment_mgr()->exec_plan_fragment( + pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb); } else { - return _exec_env->fragment_mgr()->exec_plan_fragment(params, finish_cb); + return _exec_env->fragment_mgr()->exec_plan_fragment(params, QuerySource::GROUP_COMMIT_LOAD, + finish_cb); } } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index b8dfb176d98..e69132e3cb2 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -41,16 +41,35 @@ public: std::unique_ptr<ThreadPoolToken> token_; }; +const std::string toString(QuerySource queryType) { + switch (queryType) { + case QuerySource::INTERNAL_FRONTEND: + return "INTERNAL_FRONTEND"; + case QuerySource::STREAM_LOAD: + return "STREAM_LOAD"; + case QuerySource::GROUP_COMMIT_LOAD: + return "EXTERNAL_QUERY"; + case QuerySource::ROUTINE_LOAD: + return "ROUTINE_LOAD"; + case QuerySource::EXTERNAL_CONNECTOR: + return "EXTERNAL_CONNECTOR"; + default: + return "UNKNOWN"; + } +} + QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, - bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe) + bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe, + QuerySource query_source) : fragment_num(total_fragment_num), timeout_second(-1), _query_id(query_id), _exec_env(exec_env), _is_pipeline(is_pipeline), _is_nereids(is_nereids), - _query_options(query_options) { + _query_options(query_options), + _query_source(query_source) { _init_query_mem_tracker(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker); @@ -77,7 +96,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0; DCHECK_EQ(is_report_fe_addr_valid, true); } - + clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp); register_memory_statistics(); register_cpu_statistics(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 82e75b72cee..3d523522337 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -61,6 +61,16 @@ struct ReportStatusRequest { std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn; }; +enum class QuerySource { + INTERNAL_FRONTEND, + STREAM_LOAD, + GROUP_COMMIT_LOAD, + ROUTINE_LOAD, + EXTERNAL_CONNECTOR +}; + +const std::string toString(QuerySource query_source); + // Save the common components of fragments in a query. // Some components like DescriptorTbl may be very large // that will slow down each execution of fragments when DeSer them every time. @@ -71,7 +81,7 @@ class QueryContext { public: QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_pipeline, - bool is_nereids, TNetworkAddress current_connect_fe); + bool is_nereids, TNetworkAddress current_connect_fe, QuerySource query_type); ~QueryContext(); @@ -352,6 +362,14 @@ private: std::mutex _weighted_mem_lock; int64_t _weighted_consumption = 0; int64_t _weighted_limit = 0; + timespec _query_arrival_timestamp; + // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to + // help us manage the query. + QuerySource _query_source; + +public: + timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } + QuerySource get_query_source() const { return this->_query_source; } }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0fcd4af1ce5..0761b445bee 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -143,10 +143,11 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte }; if (ctx->put_result.__isset.params) { - st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, exec_fragment); + st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, + QuerySource::STREAM_LOAD, exec_fragment); } else { st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params, - exec_fragment); + QuerySource::STREAM_LOAD, exec_fragment); } if (!st.ok()) { diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 324c21d91ae..e26264b1a22 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -641,7 +641,8 @@ Status BackendService::start_plan_fragment_execution(const TExecPlanFragmentPara if (!exec_params.fragment.__isset.output_sink) { return Status::InternalError("missing sink in plan fragment"); } - return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params); + return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params, + QuerySource::INTERNAL_FRONTEND); } void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 013af8e8030..ad8769566c0 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -524,9 +524,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl( RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); } if (cb) { - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, cb); + return _exec_env->fragment_mgr()->exec_plan_fragment( + t_request, QuerySource::INTERNAL_FRONTEND, cb); } else { - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, + QuerySource::INTERNAL_FRONTEND); } } else if (version == PFragmentRequestVersion::VERSION_2) { TExecPlanFragmentParamsList t_request; @@ -541,9 +543,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl( for (const TExecPlanFragmentParams& params : t_request.paramsList) { if (cb) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + params, QuerySource::INTERNAL_FRONTEND, cb)); } else { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + params, QuerySource::INTERNAL_FRONTEND)); } } @@ -572,9 +576,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl( timer.start(); for (const TPipelineFragmentParams& fragment : fragment_list) { if (cb) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment, cb)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + fragment, QuerySource::INTERNAL_FRONTEND, cb)); } else { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment)); + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( + fragment, QuerySource::INTERNAL_FRONTEND)); } } timer.stop(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 001fbd5a68c..b04b5aa3892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -141,6 +141,8 @@ import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; +import org.apache.doris.thrift.TFetchRunningQueriesRequest; +import org.apache.doris.thrift.TFetchRunningQueriesResult; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TFetchSplitBatchRequest; @@ -237,6 +239,7 @@ import org.apache.doris.thrift.TTableRef; import org.apache.doris.thrift.TTableStatus; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TTxnParams; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; import org.apache.doris.thrift.TWaitingTxnStatusRequest; @@ -4013,4 +4016,24 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setUserinfoList(userInfo); return result; } + + @Override + public TFetchRunningQueriesResult fetchRunningQueries(TFetchRunningQueriesRequest request) { + TFetchRunningQueriesResult result = new TFetchRunningQueriesResult(); + if (!Env.getCurrentEnv().isReady()) { + result.setStatus(new TStatus(TStatusCode.ILLEGAL_STATE)); + return result; + } + + List<TUniqueId> runningQueries = Lists.newArrayList(); + List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators(); + + for (Coordinator coordinator : allCoordinators) { + runningQueries.add(coordinator.getQueryId()); + } + + result.setStatus(new TStatus(TStatusCode.OK)); + result.setRunningQueries(runningQueries); + return result; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5e5fd13c36d..7d5b94bd9fe 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -30,6 +30,7 @@ include "RuntimeProfile.thrift" include "MasterService.thrift" include "AgentService.thrift" include "DataSinks.thrift" +include "HeartbeatService.thrift" // These are supporting structs for JniFrontend.java, which serves as the glue // between our C++ execution environment and the Java frontend. @@ -1479,6 +1480,14 @@ struct TFetchSplitBatchResult { 1: optional list<Planner.TScanRangeLocations> splits } +struct TFetchRunningQueriesResult { + 1: optional Status.TStatus status + 2: optional list<Types.TUniqueId> running_queries +} + +struct TFetchRunningQueriesRequest { +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1569,4 +1578,6 @@ service FrontendService { TShowProcessListResult showProcessList(1: TShowProcessListRequest request) TShowUserResult showUser(1: TShowUserRequest request) TFetchSplitBatchResult fetchSplitBatch(1: TFetchSplitBatchRequest request) + + TFetchRunningQueriesResult fetchRunningQueries(1: TFetchRunningQueriesRequest request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org