This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 830f250a804 [opt](query cancel) cancel query if it has pipeline task 
leakage #39223 (#39537)
830f250a804 is described below

commit 830f250a804ba292474a2838e12892b54c595324
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Mon Aug 19 14:33:59 2024 +0800

    [opt](query cancel) cancel query if it has pipeline task leakage #39223 
(#39537)
    
    pick #39223 with some modifications. Optimization will only be applied
    to pipeline x.
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   2 +
 be/src/io/fs/multi_table_pipe.cpp                  |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   9 +
 be/src/runtime/fragment_mgr.cpp                    | 188 +++++++++++++++++++--
 be/src/runtime/fragment_mgr.h                      |  12 +-
 be/src/runtime/frontend_info.h                     |   6 +
 be/src/runtime/group_commit_mgr.cpp                |   6 +-
 be/src/runtime/query_context.cpp                   |  25 ++-
 be/src/runtime/query_context.h                     |  20 ++-
 .../runtime/stream_load/stream_load_executor.cpp   |   5 +-
 be/src/service/backend_service.cpp                 |   3 +-
 be/src/service/internal_service.cpp                |  18 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  23 +++
 gensrc/thrift/FrontendService.thrift               |  11 ++
 15 files changed, 295 insertions(+), 37 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fe03cc9158f..461e328044d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1270,6 +1270,8 @@ DEFINE_mBool(enable_parquet_page_index, "false");
 
 DEFINE_mBool(ignore_not_found_file_in_external_table, "true");
 
+DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index afcdc62cb78..570fb61c5f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1358,6 +1358,8 @@ DECLARE_mBool(enable_parquet_page_index);
 // Default is true, if set to false, the not found file will result in query 
failure.
 DECLARE_mBool(ignore_not_found_file_in_external_table);
 
+DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 4469174211e..75fd05f2ef8 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -249,7 +249,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
         _inflight_cnt++;
 
         RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
-                plan, [this, plan](RuntimeState* state, Status* status) {
+                plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* 
state, Status* status) {
                     DCHECK(state);
                     auto pair = _planned_tables.find(plan.table_name);
                     if (pair == _planned_tables.end()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 236c2790ccf..df7cd2ece28 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -21,6 +21,7 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Planner_types.h>
+#include <gen_cpp/types.pb.h>
 #include <pthread.h>
 
 #include <cstdlib>
@@ -184,6 +185,14 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     // make result receiver on fe be stocked on rpc forever until timeout...
     // We need a more detail discussion.
     _query_ctx->cancel(msg, Status::Cancelled(msg));
+
+    if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
+        if (msg.find("Pipeline task leak.") != std::string::npos) {
+            LOG_WARNING("PipelineFragmentContext is cancelled due to illegal 
state : {}",
+                        this->debug_string());
+        }
+    }
+
     if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
         _is_report_on_cancel = false;
     } else {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c61bb82df75..74f42a28b8a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -17,6 +17,7 @@
 
 #include "runtime/fragment_mgr.h"
 
+#include <bits/types/struct_timespec.h>
 #include <bvar/latency_recorder.h>
 #include <exprs/runtime_filter.h>
 #include <fmt/format.h>
@@ -31,8 +32,10 @@
 #include <gen_cpp/QueryPlanExtra_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
 #include <pthread.h>
 #include <stddef.h>
+#include <thrift/TApplicationException.h>
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TTransportException.h>
@@ -44,10 +47,13 @@
 #include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <sstream>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 
 #include "common/config.h"
@@ -115,6 +121,103 @@ std::string to_load_error_http_path(const std::string& 
file_name) {
 using apache::thrift::TException;
 using apache::thrift::transport::TTransportException;
 
+static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
+                                            std::unordered_set<TUniqueId>& 
query_set) {
+    TFetchRunningQueriesResult rpc_result;
+    TFetchRunningQueriesRequest rpc_request;
+
+    Status client_status;
+    const int32 timeout_ms = 3 * 1000;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
+                                         fe_info.info.coordinator_address, 
timeout_ms,
+                                         &client_status);
+    // Abort this fe.
+    if (!client_status.ok()) {
+        LOG_WARNING("Failed to get client for {}, reason is {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                    client_status.to_string());
+        return Status::InternalError("Failed to get client for {}, reason is 
{}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     client_status.to_string());
+    }
+
+    // do rpc
+    try {
+        try {
+            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string_no_stack());
+                return Status::InternalError("Reopen failed, reason: {}",
+                                             
client_status.to_string_no_stack());
+            }
+
+            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+        }
+    } catch (apache::thrift::TException& e) {
+        // During upgrading cluster or meet any other network error.
+        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     e.what());
+    }
+
+    // Avoid logic error in frontend.
+    if (rpc_result.__isset.status == false || rpc_result.status.status_code != 
TStatusCode::OK) {
+        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                    doris::to_string(rpc_result.status.status_code));
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     
doris::to_string(rpc_result.status.status_code));
+    }
+
+    if (rpc_result.__isset.running_queries == false) {
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     "running_queries is not set");
+    }
+
+    query_set = 
std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
+                                              
rpc_result.running_queries.end());
+    return Status::OK();
+};
+
+static std::map<int64_t, std::unordered_set<TUniqueId>> 
_get_all_running_queries_from_fe() {
+    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
+            ExecEnv::GetInstance()->get_running_frontends();
+
+    std::map<int64_t, std::unordered_set<TUniqueId>> result;
+    std::vector<FrontendInfo> qualified_fes;
+
+    for (const auto& fe : running_fes) {
+        // Only consider normal frontend.
+        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
+            qualified_fes.push_back(fe.second);
+        } else {
+            return {};
+        }
+    }
+
+    for (const auto& fe_addr : qualified_fes) {
+        const int64_t process_uuid = fe_addr.info.process_uuid;
+        std::unordered_set<TUniqueId> query_set;
+        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
+        if (!st.ok()) {
+            // Empty result, cancel worker will not do anything
+            return {};
+        }
+
+        // frontend_info and process_uuid has been checked in rpc threads.
+        result[process_uuid] = query_set;
+    }
+
+    return result;
+}
+
 FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env), _stop_background_threads_latch(1) {
     _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
@@ -506,7 +609,8 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
     cb(fragment_executor->runtime_state(), &status);
 }
 
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
+                                       const QuerySource query_source) {
     if (params.txn_conf.need_txn) {
         std::shared_ptr<StreamLoadContext> stream_load_ctx =
                 std::make_shared<StreamLoadContext>(_exec_env);
@@ -539,11 +643,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         return Status::OK();
     } else {
-        return exec_plan_fragment(params, empty_function);
+        return exec_plan_fragment(params, query_source, empty_function);
     }
 }
 
-Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) {
+Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
+                                       const QuerySource query_source) {
     if (params.txn_conf.need_txn) {
         std::shared_ptr<StreamLoadContext> stream_load_ctx =
                 std::make_shared<StreamLoadContext>(_exec_env);
@@ -575,7 +680,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params) {
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         return Status::OK();
     } else {
-        return exec_plan_fragment(params, empty_function);
+        return exec_plan_fragment(params, query_source, empty_function);
     }
 }
 
@@ -619,6 +724,7 @@ void FragmentMgr::remove_pipeline_context(
 
 template <typename Params>
 Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, 
bool pipeline,
+                                   QuerySource query_source,
                                    std::shared_ptr<QueryContext>& query_ctx) {
     DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
                     { return 
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
@@ -660,9 +766,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         // This may be a first fragment request of the query.
         // Create the query fragments context.
-        query_ctx = QueryContext::create_shared(query_id, 
params.fragment_num_on_host, _exec_env,
-                                                params.query_options, 
params.coord, pipeline,
-                                                params.is_nereids, 
current_connect_fe_addr);
+        query_ctx = QueryContext::create_shared(
+                query_id, params.fragment_num_on_host, _exec_env, 
params.query_options,
+                params.coord, pipeline, params.is_nereids, 
current_connect_fe_addr, query_source);
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
@@ -712,7 +818,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 }
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
-                                       const FinishCallback& cb) {
+                                       QuerySource query_source, const 
FinishCallback& cb) {
     VLOG_ROW << "exec_plan_fragment params is "
              << apache::thrift::ThriftDebugString(params).c_str();
     // sometimes TExecPlanFragmentParams debug string is too long and glog
@@ -734,8 +840,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     bool pipeline_engine_enabled = 
params.query_options.__isset.enable_pipeline_engine &&
                                    params.query_options.enable_pipeline_engine;
 
-    RETURN_IF_ERROR(
-            _get_query_ctx(params, params.params.query_id, 
pipeline_engine_enabled, query_ctx));
+    RETURN_IF_ERROR(_get_query_ctx(params, params.params.query_id, 
pipeline_engine_enabled,
+                                   query_source, query_ctx));
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
     {
         // Need lock here, because it will modify fragment ids and std::vector 
may resize and reallocate
@@ -830,7 +936,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
 }
 
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
-                                       const FinishCallback& cb) {
+                                       QuerySource query_source, const 
FinishCallback& cb) {
     VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
              << apache::thrift::ThriftDebugString(params).c_str();
     // sometimes TExecPlanFragmentParams debug string is too long and glog
@@ -839,7 +945,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
              << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
 
     std::shared_ptr<QueryContext> query_ctx;
-    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, 
query_source, query_ctx));
     SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
     const bool enable_pipeline_x = 
params.query_options.__isset.enable_pipeline_x_engine &&
                                    
params.query_options.enable_pipeline_x_engine;
@@ -1125,9 +1231,30 @@ void FragmentMgr::cancel_fragment(const TUniqueId& 
query_id, int32_t fragment_id
 
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
+
+    timespec check_invalid_query_last_timestamp;
+    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
+
     do {
         std::vector<TUniqueId> to_cancel;
         std::vector<TUniqueId> queries_to_cancel;
+        std::vector<TUniqueId> queries_pipeline_task_leak;
+        // Fe process uuid -> set<QueryId>
+        std::map<int64_t, std::unordered_set<TUniqueId>> 
running_queries_on_all_fes;
+        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
+                ExecEnv::GetInstance()->get_running_frontends();
+
+        timespec now_for_check_invalid_query;
+        clock_gettime(CLOCK_MONOTONIC, &now_for_check_invalid_query);
+
+        if (now_for_check_invalid_query.tv_sec - 
check_invalid_query_last_timestamp.tv_sec >
+            config::pipeline_task_leakage_detect_period_secs) {
+            check_invalid_query_last_timestamp = now_for_check_invalid_query;
+            running_queries_on_all_fes = _get_all_running_queries_from_fe();
+        } else {
+            running_queries_on_all_fes.clear();
+        }
+
         VecDateTimeValue now = VecDateTimeValue::local_time();
         {
             std::lock_guard<std::mutex> lock(_lock);
@@ -1157,8 +1284,6 @@ void FragmentMgr::cancel_worker() {
                 }
             }
 
-            const auto& running_fes = 
ExecEnv::GetInstance()->get_running_frontends();
-
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
@@ -1171,12 +1296,35 @@ void FragmentMgr::cancel_worker() {
                         << "We will not cancel any outdated queries in this 
situation.";
             } else {
                 for (const auto& q : _query_ctx_map) {
-                    if (q.second->get_fe_process_uuid() == 0) {
+                    auto q_ctx = q.second;
+                    const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
+
+                    if (fe_process_uuid == 0) {
                         // zero means this query is from a older version fe or
                         // this fe is starting
                         continue;
                     }
 
+                    // If the query is not running on the any frontends, 
cancel it.
+                    if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
+                        itr != running_queries_on_all_fes.end()) {
+                        // Query not found on this frontend, and the query 
arrives before the last check
+                        if (itr->second.find(q_ctx->query_id()) == 
itr->second.end() &&
+                            q_ctx->get_query_arrival_timestamp().tv_nsec <
+                                    check_invalid_query_last_timestamp.tv_nsec 
&&
+                            q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
+                            if (q_ctx->enable_pipeline_x_exec()) {
+                                
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+                                LOG_INFO(
+                                        "Query {}, type {} is not found on any 
frontends, maybe it "
+                                        "is leaked.",
+                                        print_id(q_ctx->query_id()),
+                                        toString(q_ctx->get_query_source()));
+                                continue;
+                            }
+                        }
+                    }
+
                     auto query_context = q.second;
 
                     auto itr = running_fes.find(query_context->coord_addr);
@@ -1236,6 +1384,13 @@ void FragmentMgr::cancel_worker() {
                       << print_id(id);
         }
 
+        for (const auto& qid : queries_pipeline_task_leak) {
+            // Cancel the query, and maybe try to report debug info to fe so 
that we can
+            // collect debug info by sql or http api instead of search log.
+            cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
+                         std::string("Pipeline task leak."));
+        }
+
         if (!queries_to_cancel.empty()) {
             LOG(INFO) << "There are " << queries_to_cancel.size()
                       << " queries need to be cancelled, coordinator dead or 
restarted.";
@@ -1245,6 +1400,7 @@ void FragmentMgr::cancel_worker() {
             cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
                          std::string("Coordinator dead."));
         }
+
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
     LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
 }
@@ -1377,7 +1533,7 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     exec_fragment_params.__set_query_options(query_options);
     VLOG_ROW << "external exec_plan_fragment params is "
              << 
apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
-    return exec_plan_fragment(exec_fragment_params);
+    return exec_plan_fragment(exec_fragment_params, 
QuerySource::EXTERNAL_CONNECTOR);
 }
 
 Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 16da4826165..608ee522bad 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -79,17 +79,19 @@ public:
     void stop();
 
     // execute one plan fragment
-    Status exec_plan_fragment(const TExecPlanFragmentParams& params);
+    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
QuerySource query_type);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type);
 
     void remove_pipeline_context(
             std::shared_ptr<pipeline::PipelineFragmentContext> 
pipeline_context);
 
     // TODO(zc): report this is over
-    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
FinishCallback& cb);
+    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
QuerySource query_type,
+                              const FinishCallback& cb);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
FinishCallback& cb);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type,
+                              const FinishCallback& cb);
 
     Status start_query_execution(const PExecPlanFragmentStartRequest* request);
 
@@ -172,7 +174,7 @@ private:
 
     template <typename Params>
     Status _get_query_ctx(const Params& params, TUniqueId query_id, bool 
pipeline,
-                          std::shared_ptr<QueryContext>& query_ctx);
+                          QuerySource query_type, 
std::shared_ptr<QueryContext>& query_ctx);
 
     // This is input params
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h
index a7e4b3f999b..c4d3d710b3c 100644
--- a/be/src/runtime/frontend_info.h
+++ b/be/src/runtime/frontend_info.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/Types_types.h>
 
 #include <ctime>
 
@@ -28,4 +29,9 @@ struct FrontendInfo {
     std::time_t last_reveiving_time_ms;
 };
 
+struct FrontendAddrAndRunningQueries {
+    TNetworkAddress frontend_addr;
+    std::set<TUniqueId> running_queries;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index a5bf52d2ca7..86030664b42 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -535,9 +535,11 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t 
db_id, int64_t table_id,
         }
     };
     if (is_pipeline) {
-        return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, 
finish_cb);
+        return _exec_env->fragment_mgr()->exec_plan_fragment(
+                pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb);
     } else {
-        return _exec_env->fragment_mgr()->exec_plan_fragment(params, 
finish_cb);
+        return _exec_env->fragment_mgr()->exec_plan_fragment(params, 
QuerySource::GROUP_COMMIT_LOAD,
+                                                             finish_cb);
     }
 }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index b8dfb176d98..e69132e3cb2 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -41,16 +41,35 @@ public:
     std::unique_ptr<ThreadPoolToken> token_;
 };
 
+const std::string toString(QuerySource queryType) {
+    switch (queryType) {
+    case QuerySource::INTERNAL_FRONTEND:
+        return "INTERNAL_FRONTEND";
+    case QuerySource::STREAM_LOAD:
+        return "STREAM_LOAD";
+    case QuerySource::GROUP_COMMIT_LOAD:
+        return "EXTERNAL_QUERY";
+    case QuerySource::ROUTINE_LOAD:
+        return "ROUTINE_LOAD";
+    case QuerySource::EXTERNAL_CONNECTOR:
+        return "EXTERNAL_CONNECTOR";
+    default:
+        return "UNKNOWN";
+    }
+}
+
 QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, 
ExecEnv* exec_env,
                            const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
-                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe)
+                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe,
+                           QuerySource query_source)
         : fragment_num(total_fragment_num),
           timeout_second(-1),
           _query_id(query_id),
           _exec_env(exec_env),
           _is_pipeline(is_pipeline),
           _is_nereids(is_nereids),
-          _query_options(query_options) {
+          _query_options(query_options),
+          _query_source(query_source) {
     _init_query_mem_tracker();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
 
@@ -77,7 +96,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
                 !this->current_connect_fe.hostname.empty() && 
this->current_connect_fe.port != 0;
         DCHECK_EQ(is_report_fe_addr_valid, true);
     }
-
+    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
     register_memory_statistics();
     register_cpu_statistics();
 }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 82e75b72cee..3d523522337 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -61,6 +61,16 @@ struct ReportStatusRequest {
     std::function<void(const PPlanFragmentCancelReason&, const std::string&)> 
cancel_fn;
 };
 
+enum class QuerySource {
+    INTERNAL_FRONTEND,
+    STREAM_LOAD,
+    GROUP_COMMIT_LOAD,
+    ROUTINE_LOAD,
+    EXTERNAL_CONNECTOR
+};
+
+const std::string toString(QuerySource query_source);
+
 // Save the common components of fragments in a query.
 // Some components like DescriptorTbl may be very large
 // that will slow down each execution of fragments when DeSer them every time.
@@ -71,7 +81,7 @@ class QueryContext {
 public:
     QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
                  const TQueryOptions& query_options, TNetworkAddress 
coord_addr, bool is_pipeline,
-                 bool is_nereids, TNetworkAddress current_connect_fe);
+                 bool is_nereids, TNetworkAddress current_connect_fe, 
QuerySource query_type);
 
     ~QueryContext();
 
@@ -352,6 +362,14 @@ private:
     std::mutex _weighted_mem_lock;
     int64_t _weighted_consumption = 0;
     int64_t _weighted_limit = 0;
+    timespec _query_arrival_timestamp;
+    // Distinguish the query source, for query that comes from fe, we will 
have some memory structure on FE to
+    // help us manage the query.
+    QuerySource _query_source;
+
+public:
+    timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
+    QuerySource get_query_source() const { return this->_query_source; }
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 0fcd4af1ce5..0761b445bee 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -143,10 +143,11 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     };
 
     if (ctx->put_result.__isset.params) {
-        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, 
exec_fragment);
+        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
+                                                           
QuerySource::STREAM_LOAD, exec_fragment);
     } else {
         st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
-                                                           exec_fragment);
+                                                           
QuerySource::STREAM_LOAD, exec_fragment);
     }
 
     if (!st.ok()) {
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 324c21d91ae..e26264b1a22 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -641,7 +641,8 @@ Status BackendService::start_plan_fragment_execution(const 
TExecPlanFragmentPara
     if (!exec_params.fragment.__isset.output_sink) {
         return Status::InternalError("missing sink in plan fragment");
     }
-    return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params);
+    return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params,
+                                                         
QuerySource::INTERNAL_FRONTEND);
 }
 
 void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& 
return_val,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 013af8e8030..ad8769566c0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -524,9 +524,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(
             RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
         }
         if (cb) {
-            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, 
cb);
+            return _exec_env->fragment_mgr()->exec_plan_fragment(
+                    t_request, QuerySource::INTERNAL_FRONTEND, cb);
         } else {
-            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request,
+                                                                 
QuerySource::INTERNAL_FRONTEND);
         }
     } else if (version == PFragmentRequestVersion::VERSION_2) {
         TExecPlanFragmentParamsList t_request;
@@ -541,9 +543,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(
 
         for (const TExecPlanFragmentParams& params : t_request.paramsList) {
             if (cb) {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        params, QuerySource::INTERNAL_FRONTEND, cb));
             } else {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        params, QuerySource::INTERNAL_FRONTEND));
             }
         }
 
@@ -572,9 +576,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(
         timer.start();
         for (const TPipelineFragmentParams& fragment : fragment_list) {
             if (cb) {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment, cb));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        fragment, QuerySource::INTERNAL_FRONTEND, cb));
             } else {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        fragment, QuerySource::INTERNAL_FRONTEND));
             }
         }
         timer.stop();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 001fbd5a68c..b04b5aa3892 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -141,6 +141,8 @@ import 
org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFeResult;
 import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchRunningQueriesRequest;
+import org.apache.doris.thrift.TFetchRunningQueriesResult;
 import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
 import org.apache.doris.thrift.TFetchSchemaTableDataResult;
 import org.apache.doris.thrift.TFetchSplitBatchRequest;
@@ -237,6 +239,7 @@ import org.apache.doris.thrift.TTableRef;
 import org.apache.doris.thrift.TTableStatus;
 import org.apache.doris.thrift.TTabletLocation;
 import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
 import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@@ -4013,4 +4016,24 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         result.setUserinfoList(userInfo);
         return result;
     }
+
+    @Override
+    public TFetchRunningQueriesResult 
fetchRunningQueries(TFetchRunningQueriesRequest request) {
+        TFetchRunningQueriesResult result = new TFetchRunningQueriesResult();
+        if (!Env.getCurrentEnv().isReady()) {
+            result.setStatus(new TStatus(TStatusCode.ILLEGAL_STATE));
+            return result;
+        }
+
+        List<TUniqueId> runningQueries = Lists.newArrayList();
+        List<Coordinator> allCoordinators = 
QeProcessorImpl.INSTANCE.getAllCoordinators();
+
+        for (Coordinator coordinator : allCoordinators) {
+            runningQueries.add(coordinator.getQueryId());
+        }
+
+        result.setStatus(new TStatus(TStatusCode.OK));
+        result.setRunningQueries(runningQueries);
+        return result;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 5e5fd13c36d..7d5b94bd9fe 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -30,6 +30,7 @@ include "RuntimeProfile.thrift"
 include "MasterService.thrift"
 include "AgentService.thrift"
 include "DataSinks.thrift"
+include "HeartbeatService.thrift"
 
 // These are supporting structs for JniFrontend.java, which serves as the glue
 // between our C++ execution environment and the Java frontend.
@@ -1479,6 +1480,14 @@ struct TFetchSplitBatchResult {
     1: optional list<Planner.TScanRangeLocations> splits
 }
 
+struct TFetchRunningQueriesResult {
+    1: optional Status.TStatus status
+    2: optional list<Types.TUniqueId> running_queries
+}
+
+struct TFetchRunningQueriesRequest {
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1569,4 +1578,6 @@ service FrontendService {
     TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
     TShowUserResult showUser(1: TShowUserRequest request)
     TFetchSplitBatchResult fetchSplitBatch(1: TFetchSplitBatchRequest request)
+
+    TFetchRunningQueriesResult fetchRunningQueries(1: 
TFetchRunningQueriesRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to