This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0f22f044c1 [opt](query cancel) cancel query if it has pipeline task 
leakage (#39223)
b0f22f044c1 is described below

commit b0f22f044c15cd7b95d1bcb52fde2fa7f6a4e686
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Sat Aug 17 16:40:48 2024 +0800

    [opt](query cancel) cancel query if it has pipeline task leakage (#39223)
    
    * Problem
    
    We are currently facing an issue where pipeline tasks experience leaks
    in certain situations. The leak in pipeline tasks refers to the scenario
    where a query has already been completed, but its associated data
    structures still persist on the backend (BE). This could lead to some
    memory or computational resources on the BE never being released.
    
    * Fix
    
    We will periodically reconcile queries with the Frontend (FE) in the
    cancel work thread. Once we detect that a query has been completed on
    the FE but still exists on the Backend (BE), we will cancel the query to
    promptly release the resources. To avoid mistakenly triggering
    cancellations, we employ a conservative strategy. For instance, we will
    not proactively cancel queries if we detect any FE is in an abnormal
    state or if there are network conflicts.
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   2 +
 be/src/io/fs/multi_table_pipe.cpp                  |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   6 +
 be/src/runtime/fragment_mgr.cpp                    | 179 +++++++++++++++++++--
 be/src/runtime/fragment_mgr.h                      |  12 +-
 be/src/runtime/frontend_info.h                     |   6 +
 be/src/runtime/group_commit_mgr.cpp                |   3 +-
 be/src/runtime/query_context.cpp                   |  25 ++-
 be/src/runtime/query_context.h                     |  19 ++-
 .../runtime/stream_load/stream_load_executor.cpp   |   5 +-
 be/src/service/backend_service.cpp                 |   3 +-
 be/src/service/internal_service.cpp                |  18 ++-
 .../apache/doris/service/FrontendServiceImpl.java  |  21 +++
 gensrc/thrift/FrontendService.thrift               |  11 ++
 15 files changed, 279 insertions(+), 35 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a671bbea708..78805a58ac6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1330,6 +1330,8 @@ DEFINE_mBool(enable_hdfs_mem_limiter, "true");
 
 DEFINE_mInt16(topn_agg_limit_multiplier, "2");
 
+DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 73d2c24b04d..aeaeea30ae8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1427,6 +1427,8 @@ DECLARE_mBool(enable_hdfs_mem_limiter);
 // we should do agg limit opt
 DECLARE_mInt16(topn_agg_limit_multiplier);
 
+DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index d7fdd8a738b..357abee9d0f 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -251,7 +251,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
         _inflight_cnt++;
 
         RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
-                plan, [this, plan](RuntimeState* state, Status* status) {
+                plan, QuerySource::ROUTINE_LOAD, [this, plan](RuntimeState* 
state, Status* status) {
                     DCHECK(state);
                     auto pair = _planned_tables.find(plan.table_name);
                     if (pair == _planned_tables.end()) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 928191562b1..c3942e8286e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -176,6 +176,12 @@ void PipelineFragmentContext::cancel(const Status reason) {
     if (reason.is<ErrorCode::TIMEOUT>()) {
         LOG(WARNING) << "PipelineFragmentContext is cancelled due to timeout : 
" << debug_string();
     }
+
+    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
+        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state 
: {}",
+                    this->debug_string());
+    }
+
     _query_ctx->cancel(reason, _fragment_id);
     if (reason.is<ErrorCode::LIMIT_REACH>()) {
         _is_report_on_cancel = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 13c33e79c9a..2e69bb34152 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -17,6 +17,7 @@
 
 #include "runtime/fragment_mgr.h"
 
+#include <bits/types/struct_timespec.h>
 #include <bvar/latency_recorder.h>
 #include <exprs/runtime_filter.h>
 #include <fmt/format.h>
@@ -34,6 +35,7 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <pthread.h>
 #include <stddef.h>
+#include <thrift/TApplicationException.h>
 #include <thrift/Thrift.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <thrift/transport/TTransportException.h>
@@ -45,11 +47,13 @@
 #include "common/status.h"
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <sstream>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 
 #include "cloud/config.h"
@@ -133,6 +137,103 @@ std::string to_load_error_http_path(const std::string& 
file_name) {
 using apache::thrift::TException;
 using apache::thrift::transport::TTransportException;
 
+static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
+                                            std::unordered_set<TUniqueId>& 
query_set) {
+    TFetchRunningQueriesResult rpc_result;
+    TFetchRunningQueriesRequest rpc_request;
+
+    Status client_status;
+    const int32 timeout_ms = 3 * 1000;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
+                                         fe_info.info.coordinator_address, 
timeout_ms,
+                                         &client_status);
+    // Abort this fe.
+    if (!client_status.ok()) {
+        LOG_WARNING("Failed to get client for {}, reason is {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                    client_status.to_string());
+        return Status::InternalError("Failed to get client for {}, reason is 
{}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     client_status.to_string());
+    }
+
+    // do rpc
+    try {
+        try {
+            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string_no_stack());
+                return Status::InternalError("Reopen failed, reason: {}",
+                                             
client_status.to_string_no_stack());
+            }
+
+            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
+        }
+    } catch (apache::thrift::TException& e) {
+        // During upgrading cluster or meet any other network error.
+        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     e.what());
+    }
+
+    // Avoid logic error in frontend.
+    if (rpc_result.__isset.status == false || rpc_result.status.status_code != 
TStatusCode::OK) {
+        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
+                    
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                    doris::to_string(rpc_result.status.status_code));
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     
doris::to_string(rpc_result.status.status_code));
+    }
+
+    if (rpc_result.__isset.running_queries == false) {
+        return Status::InternalError("Failed to fetch running queries from {}, 
reason: {}",
+                                     
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
+                                     "running_queries is not set");
+    }
+
+    query_set = 
std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
+                                              
rpc_result.running_queries.end());
+    return Status::OK();
+};
+
+static std::map<int64_t, std::unordered_set<TUniqueId>> 
_get_all_running_queries_from_fe() {
+    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
+            ExecEnv::GetInstance()->get_running_frontends();
+
+    std::map<int64_t, std::unordered_set<TUniqueId>> result;
+    std::vector<FrontendInfo> qualified_fes;
+
+    for (const auto& fe : running_fes) {
+        // Only consider normal frontend.
+        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
+            qualified_fes.push_back(fe.second);
+        } else {
+            return {};
+        }
+    }
+
+    for (const auto& fe_addr : qualified_fes) {
+        const int64_t process_uuid = fe_addr.info.process_uuid;
+        std::unordered_set<TUniqueId> query_set;
+        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
+        if (!st.ok()) {
+            // Empty result, cancel worker will not do anything
+            return {};
+        }
+
+        // frontend_info and process_uuid has been checked in rpc threads.
+        result[process_uuid] = query_set;
+    }
+
+    return result;
+}
+
 FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env), _stop_background_threads_latch(1) {
     _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
@@ -467,11 +568,13 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
 
 static void empty_function(RuntimeState*, Status*) {}
 
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
+                                       const QuerySource query_source) {
     return Status::InternalError("Non-pipeline is disabled!");
 }
 
-Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) {
+Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
+                                       const QuerySource query_source) {
     if (params.txn_conf.need_txn) {
         std::shared_ptr<StreamLoadContext> stream_load_ctx =
                 std::make_shared<StreamLoadContext>(_exec_env);
@@ -503,7 +606,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params) {
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
         return Status::OK();
     } else {
-        return exec_plan_fragment(params, empty_function);
+        return exec_plan_fragment(params, query_source, empty_function);
     }
 }
 
@@ -566,6 +669,7 @@ std::shared_ptr<QueryContext> 
FragmentMgr::get_or_erase_query_ctx_with_lock(
 
 template <typename Params>
 Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, 
bool pipeline,
+                                   QuerySource query_source,
                                    std::shared_ptr<QueryContext>& query_ctx) {
     DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
                     { return 
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
@@ -597,9 +701,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         // This may be a first fragment request of the query.
         // Create the query fragments context.
-        query_ctx =
-                QueryContext::create_shared(query_id, _exec_env, 
params.query_options, params.coord,
-                                            pipeline, params.is_nereids, 
params.current_connect_fe);
+        query_ctx = QueryContext::create_shared(query_id, _exec_env, 
params.query_options,
+                                                params.coord, pipeline, 
params.is_nereids,
+                                                params.current_connect_fe, 
query_source);
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
@@ -648,7 +752,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 }
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
-                                       const FinishCallback& cb) {
+                                       QuerySource query_source, const 
FinishCallback& cb) {
     return Status::InternalError("Non-pipeline is disabled!");
 }
 
@@ -690,7 +794,7 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& 
query_id) {
 }
 
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
-                                       const FinishCallback& cb) {
+                                       QuerySource query_source, const 
FinishCallback& cb) {
     VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
              << apache::thrift::ThriftDebugString(params).c_str();
     // sometimes TExecPlanFragmentParams debug string is too long and glog
@@ -699,7 +803,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
              << 
apache::thrift::ThriftDebugString(params.query_options).c_str();
 
     std::shared_ptr<QueryContext> query_ctx;
-    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
+    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, 
query_source, query_ctx));
     SCOPED_ATTACH_TASK(query_ctx.get());
     int64_t duration_ns = 0;
     std::shared_ptr<pipeline::PipelineFragmentContext> context =
@@ -830,12 +934,30 @@ void FragmentMgr::cancel_instance(const TUniqueId 
instance_id, const Status reas
 
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
+
+    timespec check_invalid_query_last_timestamp;
+    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
+
     do {
         std::vector<TUniqueId> queries_lost_coordinator;
         std::vector<TUniqueId> queries_timeout;
+        std::vector<TUniqueId> queries_pipeline_task_leak;
+        // Fe process uuid -> set<QueryId>
+        std::map<int64_t, std::unordered_set<TUniqueId>> 
running_queries_on_all_fes;
+        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
+                ExecEnv::GetInstance()->get_running_frontends();
 
         timespec now;
         clock_gettime(CLOCK_MONOTONIC, &now);
+
+        if (now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
+            config::pipeline_task_leakage_detect_period_secs) {
+            check_invalid_query_last_timestamp = now;
+            running_queries_on_all_fes = _get_all_running_queries_from_fe();
+        } else {
+            running_queries_on_all_fes.clear();
+        }
+
         {
             std::lock_guard<std::mutex> lock(_lock);
             for (auto& pipeline_itr : _pipeline_map) {
@@ -855,8 +977,6 @@ void FragmentMgr::cancel_worker() {
                 }
             }
 
-            const auto& running_fes = 
ExecEnv::GetInstance()->get_running_frontends();
-
             // We use a very conservative cancel strategy.
             // 0. If there are no running frontends, do not cancel any queries.
             // 1. If query's process uuid is zero, do not cancel
@@ -870,15 +990,35 @@ void FragmentMgr::cancel_worker() {
             } else {
                 for (const auto& it : _query_ctx_map) {
                     if (auto q_ctx = it.second.lock()) {
-                        if (q_ctx->get_fe_process_uuid() == 0) {
+                        const int64_t fe_process_uuid = 
q_ctx->get_fe_process_uuid();
+
+                        if (fe_process_uuid == 0) {
                             // zero means this query is from a older version 
fe or
                             // this fe is starting
                             continue;
                         }
 
+                        // If the query is not running on the any frontends, 
cancel it.
+                        if (auto itr = 
running_queries_on_all_fes.find(fe_process_uuid);
+                            itr != running_queries_on_all_fes.end()) {
+                            // Query not found on this frontend, and the query 
arrives before the last check
+                            if (itr->second.find(it.first) == 
itr->second.end() &&
+                                q_ctx->get_query_arrival_timestamp().tv_nsec <
+                                        
check_invalid_query_last_timestamp.tv_nsec &&
+                                q_ctx->get_query_source() == 
QuerySource::INTERNAL_FRONTEND) {
+                                
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+                                LOG_INFO(
+                                        "Query {}, type {} is not found on any 
frontends, maybe it "
+                                        "is leaked.",
+                                        print_id(q_ctx->query_id()),
+                                        toString(q_ctx->get_query_source()));
+                                continue;
+                            }
+                        }
+
                         auto itr = running_fes.find(q_ctx->coord_addr);
                         if (itr != running_fes.end()) {
-                            if (q_ctx->get_fe_process_uuid() == 
itr->second.info.process_uuid ||
+                            if (fe_process_uuid == 
itr->second.info.process_uuid ||
                                 itr->second.info.process_uuid == 0) {
                                 continue;
                             } else {
@@ -932,9 +1072,18 @@ void FragmentMgr::cancel_worker() {
                                  "FragmentMgr cancel worker going to cancel 
timeout instance "));
         }
 
+        for (const auto& qid : queries_pipeline_task_leak) {
+            // Cancel the query, and maybe try to report debug info to fe so 
that we can
+            // collect debug info by sql or http api instead of search log.
+            cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
+                                      "Potential pipeline task leakage"));
+        }
+
         for (const auto& qid : queries_lost_coordinator) {
-            cancel_query(qid, Status::InternalError("Coordinator dead."));
+            cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
+                                      "Source frontend is not running or 
restarted"));
         }
+
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
     LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
 }
@@ -1032,7 +1181,7 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     exec_fragment_params.__set_query_options(query_options);
     VLOG_ROW << "external exec_plan_fragment params is "
              << 
apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
-    return exec_plan_fragment(exec_fragment_params);
+    return exec_plan_fragment(exec_fragment_params, 
QuerySource::EXTERNAL_CONNECTOR);
 }
 
 Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index cb32bc5e77e..bc066066f7b 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -81,17 +81,19 @@ public:
     void stop();
 
     // execute one plan fragment
-    Status exec_plan_fragment(const TExecPlanFragmentParams& params);
+    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
QuerySource query_type);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type);
 
     void remove_pipeline_context(
             std::shared_ptr<pipeline::PipelineFragmentContext> 
pipeline_context);
 
     // TODO(zc): report this is over
-    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
FinishCallback& cb);
+    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
QuerySource query_type,
+                              const FinishCallback& cb);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
FinishCallback& cb);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
QuerySource query_type,
+                              const FinishCallback& cb);
 
     Status start_query_execution(const PExecPlanFragmentStartRequest* request);
 
@@ -155,7 +157,7 @@ private:
 
     template <typename Params>
     Status _get_query_ctx(const Params& params, TUniqueId query_id, bool 
pipeline,
-                          std::shared_ptr<QueryContext>& query_ctx);
+                          QuerySource query_type, 
std::shared_ptr<QueryContext>& query_ctx);
 
     // This is input params
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h
index a7e4b3f999b..c4d3d710b3c 100644
--- a/be/src/runtime/frontend_info.h
+++ b/be/src/runtime/frontend_info.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/Types_types.h>
 
 #include <ctime>
 
@@ -28,4 +29,9 @@ struct FrontendInfo {
     std::time_t last_reveiving_time_ms;
 };
 
+struct FrontendAddrAndRunningQueries {
+    TNetworkAddress frontend_addr;
+    std::set<TUniqueId> running_queries;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 2383f25afc8..3250379cf85 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -582,7 +582,8 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, 
int64_t table_id,
                          << ", st=" << finish_st.to_string();
         }
     };
-    return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, 
finish_cb);
+    return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params,
+                                                         
QuerySource::GROUP_COMMIT_LOAD, finish_cb);
 }
 
 Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 04bcb16cc8a..f10a0a5edca 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -55,15 +55,34 @@ public:
     std::unique_ptr<ThreadPoolToken> token_;
 };
 
+const std::string toString(QuerySource queryType) {
+    switch (queryType) {
+    case QuerySource::INTERNAL_FRONTEND:
+        return "INTERNAL_FRONTEND";
+    case QuerySource::STREAM_LOAD:
+        return "STREAM_LOAD";
+    case QuerySource::GROUP_COMMIT_LOAD:
+        return "EXTERNAL_QUERY";
+    case QuerySource::ROUTINE_LOAD:
+        return "ROUTINE_LOAD";
+    case QuerySource::EXTERNAL_CONNECTOR:
+        return "EXTERNAL_CONNECTOR";
+    default:
+        return "UNKNOWN";
+    }
+}
+
 QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
                            const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
-                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe)
+                           bool is_pipeline, bool is_nereids, TNetworkAddress 
current_connect_fe,
+                           QuerySource query_source)
         : _timeout_second(-1),
           _query_id(query_id),
           _exec_env(exec_env),
           _is_pipeline(is_pipeline),
           _is_nereids(is_nereids),
-          _query_options(query_options) {
+          _query_options(query_options),
+          _query_source(query_source) {
     _init_query_mem_tracker();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
     _query_watcher.start();
@@ -89,7 +108,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
                 !this->current_connect_fe.hostname.empty() && 
this->current_connect_fe.port != 0;
         DCHECK_EQ(is_report_fe_addr_valid, true);
     }
-
+    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
     register_memory_statistics();
     register_cpu_statistics();
 }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 3241010c20e..006305bf599 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -63,6 +63,16 @@ struct ReportStatusRequest {
     std::function<void(const Status&)> cancel_fn;
 };
 
+enum class QuerySource {
+    INTERNAL_FRONTEND,
+    STREAM_LOAD,
+    GROUP_COMMIT_LOAD,
+    ROUTINE_LOAD,
+    EXTERNAL_CONNECTOR
+};
+
+const std::string toString(QuerySource query_source);
+
 // Save the common components of fragments in a query.
 // Some components like DescriptorTbl may be very large
 // that will slow down each execution of fragments when DeSer them every time.
@@ -73,7 +83,7 @@ class QueryContext {
 public:
     QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& 
query_options,
                  TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids,
-                 TNetworkAddress current_connect_fe);
+                 TNetworkAddress current_connect_fe, QuerySource query_type);
 
     ~QueryContext();
 
@@ -310,6 +320,10 @@ private:
     std::atomic<int64_t> _spill_threshold {0};
 
     std::mutex _profile_mutex;
+    timespec _query_arrival_timestamp;
+    // Distinguish the query source, for query that comes from fe, we will 
have some memory structure on FE to
+    // help us manage the query.
+    QuerySource _query_source;
 
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     // flatten profile of one fragment:
@@ -348,6 +362,9 @@ public:
     bool enable_profile() const {
         return _query_options.__isset.enable_profile && 
_query_options.enable_profile;
     }
+
+    timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
+    QuerySource get_query_source() const { return this->_query_source; }
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 4ddd29ac9c3..4b0788186a0 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -147,10 +147,11 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
 
     if (ctx->put_result.__isset.params) {
-        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params, 
exec_fragment);
+        st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
+                                                           
QuerySource::STREAM_LOAD, exec_fragment);
     } else {
         st = 
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.pipeline_params,
-                                                           exec_fragment);
+                                                           
QuerySource::STREAM_LOAD, exec_fragment);
     }
 
     if (!st.ok()) {
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 9b63439a634..aa29661da02 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -653,7 +653,8 @@ Status BaseBackendService::start_plan_fragment_execution(
     if (!exec_params.fragment.__isset.output_sink) {
         return Status::InternalError("missing sink in plan fragment");
     }
-    return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params);
+    return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params,
+                                                         
QuerySource::INTERNAL_FRONTEND);
 }
 
 void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& 
return_val,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 2492d2a846b..c2251c240ae 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -514,9 +514,11 @@ Status PInternalService::_exec_plan_fragment_impl(
             RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
         }
         if (cb) {
-            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, 
cb);
+            return _exec_env->fragment_mgr()->exec_plan_fragment(
+                    t_request, QuerySource::INTERNAL_FRONTEND, cb);
         } else {
-            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+            return _exec_env->fragment_mgr()->exec_plan_fragment(t_request,
+                                                                 
QuerySource::INTERNAL_FRONTEND);
         }
     } else if (version == PFragmentRequestVersion::VERSION_2) {
         TExecPlanFragmentParamsList t_request;
@@ -531,9 +533,11 @@ Status PInternalService::_exec_plan_fragment_impl(
 
         for (const TExecPlanFragmentParams& params : t_request.paramsList) {
             if (cb) {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        params, QuerySource::INTERNAL_FRONTEND, cb));
             } else {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        params, QuerySource::INTERNAL_FRONTEND));
             }
         }
 
@@ -562,9 +566,11 @@ Status PInternalService::_exec_plan_fragment_impl(
         timer.start();
         for (const TPipelineFragmentParams& fragment : fragment_list) {
             if (cb) {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment, cb));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        fragment, QuerySource::INTERNAL_FRONTEND, cb));
             } else {
-                
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment));
+                RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+                        fragment, QuerySource::INTERNAL_FRONTEND));
             }
         }
         timer.stop();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1e6d8e987c1..f64f55a47fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -143,6 +143,8 @@ import org.apache.doris.thrift.TDropPlsqlPackageRequest;
 import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
 import org.apache.doris.thrift.TFeResult;
 import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchRunningQueriesRequest;
+import org.apache.doris.thrift.TFetchRunningQueriesResult;
 import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
 import org.apache.doris.thrift.TFetchSchemaTableDataResult;
 import org.apache.doris.thrift.TFetchSplitBatchRequest;
@@ -3982,4 +3984,23 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return new TStatus(TStatusCode.OK);
     }
 
+    @Override
+    public TFetchRunningQueriesResult 
fetchRunningQueries(TFetchRunningQueriesRequest request) {
+        TFetchRunningQueriesResult result = new TFetchRunningQueriesResult();
+        if (!Env.getCurrentEnv().isReady()) {
+            result.setStatus(new TStatus(TStatusCode.ILLEGAL_STATE));
+            return result;
+        }
+
+        List<TUniqueId> runningQueries = Lists.newArrayList();
+        List<Coordinator> allCoordinators = 
QeProcessorImpl.INSTANCE.getAllCoordinators();
+
+        for (Coordinator coordinator : allCoordinators) {
+            runningQueries.add(coordinator.getQueryId());
+        }
+
+        result.setStatus(new TStatus(TStatusCode.OK));
+        result.setRunningQueries(runningQueries);
+        return result;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index a3d2ad26967..b6e4aacf656 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -30,6 +30,7 @@ include "RuntimeProfile.thrift"
 include "MasterService.thrift"
 include "AgentService.thrift"
 include "DataSinks.thrift"
+include "HeartbeatService.thrift"
 
 // These are supporting structs for JniFrontend.java, which serves as the glue
 // between our C++ execution environment and the Java frontend.
@@ -1561,6 +1562,14 @@ struct TFetchSplitBatchResult {
     2: optional Status.TStatus status
 }
 
+struct TFetchRunningQueriesResult {
+    1: optional Status.TStatus status
+    2: optional list<Types.TUniqueId> running_queries
+}
+
+struct TFetchRunningQueriesRequest {
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1655,4 +1664,6 @@ service FrontendService {
 
     TFetchSplitBatchResult fetchSplitBatch(1: TFetchSplitBatchRequest request)
     Status.TStatus updatePartitionStatsCache(1: 
TUpdateFollowerPartitionStatsCacheRequest request)
+
+    TFetchRunningQueriesResult fetchRunningQueries(1: 
TFetchRunningQueriesRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to