This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new af68d8b5077 [feat](profile) support getting query progress (#51400)
af68d8b5077 is described below

commit af68d8b5077782d65ec9cadba479ed58b8789e90
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Sat Jun 21 00:39:30 2025 +0800

    [feat](profile) support getting query progress (#51400)
    
    ### What problem does this PR solve?
    
    Followup #50791
    
    Add a new FE HTTP API: `/rest/v2/manager/query/statistics/trace_id`.
    This API will return the query runtime statistic corresponding to a
    given trace id.
    The query statistics includes info such as real-time scan rows/bytes.
    
    Internally, Doris will get query id by trace id from all Frontends, and
    then fetch query statistics from BE.
    
    Use pattern:
    1. User set custom trace id by: `set
    session_context="trace_id:my_trace_id"`
    2. User executes a query in same session
    3. Start a http client to get query statistics in real-time during the
    query process.
    
    
    
![progress](https://github.com/user-attachments/assets/0a697c7d-d87a-4e9c-8965-c5a2d7d7836e)
    
    Also fix a bug in `CoordinatorContext.java`, to get real host.
    introduced from #41730
    
    This PR also change the column name of `information_schema.processlist`
    table, to be same as column
    name in `show processlist`.
---
 .../schema_scanner/schema_processlist_scanner.cpp  |  39 +++---
 be/src/runtime/fragment_mgr.cpp                    |   9 ++
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  13 ++
 be/src/runtime/runtime_query_statistics_mgr.h      |   3 +-
 be/src/service/backend_service.cpp                 |  25 ++--
 .../java/org/apache/doris/catalog/SchemaTable.java |  32 ++---
 .../doris/common/profile/ProfileManager.java       |  68 +++++++++--
 .../doris/httpv2/controller/SessionController.java |  18 +--
 .../doris/httpv2/rest/manager/HttpUtils.java       |   6 +
 .../httpv2/rest/manager/QueryProfileAction.java    | 133 +++++++++++++++++----
 .../plans/commands/ShowProcessListCommand.java     |  28 ++---
 .../java/org/apache/doris/qe/ConnectContext.java   |   6 +-
 .../java/org/apache/doris/qe/ConnectPoolMgr.java   |   9 ++
 .../java/org/apache/doris/qe/ConnectScheduler.java |   5 +
 .../org/apache/doris/qe/CoordinatorContext.java    |   2 +-
 .../org/apache/doris/qe/ConnectContextTest.java    |   2 +-
 gensrc/thrift/BackendService.thrift                |   3 +
 .../plugins/plugin_curl_requester.groovy           |  10 +-
 ...e_compaction_with_variant_inverted_index.groovy |  12 +-
 .../test_information_schema_timezone.groovy        |   4 +-
 .../suites/manager/test_manager_interface_5.groovy |  99 +++++++++++++++
 .../suites/show_p0/test_show_processlist.groovy    |  29 ++++-
 23 files changed, 438 insertions(+), 119 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp 
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index 7acb82ea849..69367556767 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -33,20 +33,21 @@ namespace doris {
 #include "common/compile_check_begin.h"
 
 std::vector<SchemaScanner::ColumnDesc> 
SchemaProcessListScanner::_s_processlist_columns = {
-        {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"ID", TYPE_LARGEINT, sizeof(int128_t), false},
-        {"USER", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
-        {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"DB", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"TIME", TYPE_INT, sizeof(int32_t), false},
-        {"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"FE", TYPE_VARCHAR, sizeof(StringRef), false},
-        {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}};
+        {"CurrentConnected", TYPE_VARCHAR, sizeof(StringRef), false},       // 0
+        {"Id", TYPE_LARGEINT, sizeof(int128_t), false},                     // 
1
+        {"User", TYPE_VARCHAR, sizeof(StringRef), false},                   // 
2
+        {"Host", TYPE_VARCHAR, sizeof(StringRef), false},                   // 
3
+        {"LoginTime", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, // 
4
+        {"Catalog", TYPE_VARCHAR, sizeof(StringRef), false},                // 
5
+        {"Db", TYPE_VARCHAR, sizeof(StringRef), false},                     // 
6
+        {"Command", TYPE_VARCHAR, sizeof(StringRef), false},                // 
7
+        {"Time", TYPE_INT, sizeof(int32_t), false},                         // 
8
+        {"State", TYPE_VARCHAR, sizeof(StringRef), false},                  // 
9
+        {"QueryId", TYPE_VARCHAR, sizeof(StringRef), false},                // 
10
+        {"TraceId", TYPE_VARCHAR, sizeof(StringRef), false},                // 
11
+        {"Info", TYPE_VARCHAR, sizeof(StringRef), false},                   // 
12
+        {"FE", TYPE_VARCHAR, sizeof(StringRef), false},                     // 
13
+        {"CloudCluster", TYPE_VARCHAR, sizeof(StringRef), false}};          // 
14
 
 SchemaProcessListScanner::SchemaProcessListScanner()
         : SchemaScanner(_s_processlist_columns, 
TSchemaTableType::SCH_PROCESSLIST) {}
@@ -62,6 +63,16 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
         TShowProcessListResult tmp_ret;
         RETURN_IF_ERROR(
                 SchemaHelper::show_process_list(fe_addr.hostname, 
fe_addr.port, request, &tmp_ret));
+
+        // Check and adjust the number of columns in each row to ensure 15 
columns
+        // This is compatible with newly added column "trace id". #51400
+        for (auto& row : tmp_ret.process_list) {
+            if (row.size() == 14) {
+                // Insert an empty string at position 11 (index 11) for the 
TRACE_ID column
+                row.insert(row.begin() + 11, "");
+            }
+        }
+
         
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
                                                  tmp_ret.process_list.begin(),
                                                  tmp_ret.process_list.end());
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 230cb12a110..90ec55ac0b5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1382,4 +1382,13 @@ Status FragmentMgr::get_realtime_exec_status(const 
TUniqueId& query_id,
     return Status::OK();
 }
 
+Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, 
TQueryStatistics* query_stats) {
+    if (query_stats == nullptr) {
+        return Status::InvalidArgument("query_stats is nullptr");
+    }
+
+    return 
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
+            print_id(query_id), query_stats);
+}
+
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 4eab31ff3a2..e6f388f73e1 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -184,6 +184,8 @@ public:
 
     Status get_realtime_exec_status(const TUniqueId& query_id,
                                     TReportExecStatusParams* exec_status);
+    // get the query statistics of with a given query id
+    Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* 
query_stats);
 
     std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
 
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 79db2cfec56..772925dcb7e 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -511,6 +511,19 @@ void 
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
     }
 }
 
+Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& 
query_id,
+                                                       TQueryStatistics* 
query_stats) {
+    std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
+
+    auto resource_ctx = _resource_contexts_map.find(query_id);
+    if (resource_ctx == _resource_contexts_map.end()) {
+        return Status::InternalError("failed to find query with id {}", 
query_id);
+    }
+
+    resource_ctx->second->to_thrift_query_statistics(query_stats);
+    return Status::OK();
+}
+
 void RuntimeQueryStatisticsMgr::get_tasks_resource_context(
         std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) {
     std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h 
b/be/src/runtime/runtime_query_statistics_mgr.h
index ef80b6fa5b9..6b2e4545390 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -55,6 +55,7 @@ public:
 
     // used for backend_active_tasks
     void get_active_be_tasks_block(vectorized::Block* block);
+    Status get_query_statistics(const std::string& query_id, TQueryStatistics* 
query_stats);
 
     // used for MemoryReclamation
     void 
get_tasks_resource_context(std::vector<std::shared_ptr<ResourceContext>>& 
resource_ctxs);
@@ -95,4 +96,4 @@ private:
     std::unique_ptr<ThreadPool> _thread_pool;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 114d99fe264..33fa4401391 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1321,19 +1321,28 @@ void 
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
 
     std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
             std::make_unique<TReportExecStatusParams>();
-    Status st = 
ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
-            request.id, report_exec_status_params.get());
+    std::unique_ptr<TQueryStatistics> query_stats = 
std::make_unique<TQueryStatistics>();
 
-    if (!st.ok()) {
-        response.__set_status(st.to_thrift());
-        return;
+    std::string req_type = request.__isset.req_type ? request.req_type : 
"profile";
+    Status st;
+    if (req_type == "stats") {
+        st = 
ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id,
+                                                                          
query_stats.get());
+        if (st.ok()) {
+            response.__set_query_stats(*query_stats);
+        }
+    } else {
+        // default is "profile"
+        st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
+                request.id, report_exec_status_params.get());
+        if (st.ok()) {
+            
response.__set_report_exec_status_params(*report_exec_status_params);
+        }
     }
 
     report_exec_status_params->__set_query_id(TUniqueId());
     report_exec_status_params->__set_done(false);
-
-    response.__set_status(Status::OK().to_thrift());
-    response.__set_report_exec_status_params(*report_exec_status_params);
+    response.__set_status(st.to_thrift());
 }
 
 void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 3d7f9603d97..ca42d7af40f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -523,22 +523,24 @@ public class SchemaTable extends Table {
                                     .column("IS_MUTABLE", 
ScalarType.createType(PrimitiveType.BOOLEAN))
                                     .build()))
             .put("processlist",
+                    // ATTN, the column name should be compatible with MySQL
+                    // See: 
https://dev.mysql.com/doc/refman/8.4/en/show-processlist.html
                     new SchemaTable(SystemIdGenerator.getNextId(), 
"processlist", TableType.SCHEMA,
-                            builder().column("CURRENT_CONNECTED", 
ScalarType.createVarchar(16))
-                                    .column("ID", 
ScalarType.createType(PrimitiveType.LARGEINT))
-                                    .column("USER", 
ScalarType.createVarchar(32))
-                                    .column("HOST", 
ScalarType.createVarchar(261))
-                                    .column("LOGIN_TIME", 
ScalarType.createType(PrimitiveType.DATETIMEV2))
-                                    .column("CATALOG", 
ScalarType.createVarchar(64))
-                                    .column("DB", ScalarType.createVarchar(64))
-                                    .column("COMMAND", 
ScalarType.createVarchar(16))
-                                    .column("TIME", 
ScalarType.createType(PrimitiveType.INT))
-                                    .column("STATE", 
ScalarType.createVarchar(64))
-                                    .column("QUERY_ID", 
ScalarType.createVarchar(256))
-                                    .column("INFO", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
-                                    .column("FE",
-                                            ScalarType.createVarchar(64))
-                                    .column("CLOUD_CLUSTER", 
ScalarType.createVarchar(64)).build(), true))
+                            builder().column("CurrentConnected", 
ScalarType.createVarchar(16))
+                                    .column("Id", 
ScalarType.createType(PrimitiveType.LARGEINT))
+                                    .column("User", 
ScalarType.createVarchar(32))
+                                    .column("Host", 
ScalarType.createVarchar(261))
+                                    .column("LoginTime", 
ScalarType.createType(PrimitiveType.DATETIMEV2))
+                                    .column("Catalog", 
ScalarType.createVarchar(64))
+                                    .column("Db", ScalarType.createVarchar(64))
+                                    .column("Command", 
ScalarType.createVarchar(16))
+                                    .column("Time", 
ScalarType.createType(PrimitiveType.INT))
+                                    .column("State", 
ScalarType.createVarchar(64))
+                                    .column("QueryId", 
ScalarType.createVarchar(256))
+                                    .column("TraceId", 
ScalarType.createVarchar(256))
+                                    .column("Info", 
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
+                                    .column("FE", ScalarType.createVarchar(64))
+                                    .column("CloudCluster", 
ScalarType.createVarchar(64)).build(), true))
             .put("workload_policy",
                     new SchemaTable(SystemIdGenerator.getNextId(), 
"workload_policy", TableType.SCHEMA,
                             builder().column("ID", 
ScalarType.createType(PrimitiveType.BIGINT))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index 5666ca8965a..36aa9a68b25 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -32,9 +32,11 @@ import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
 import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -253,7 +255,7 @@ public class ProfileManager extends MasterDaemon {
     }
 
     private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
-            TUniqueId queryID, TNetworkAddress targetBackend) {
+            TUniqueId queryID, String reqType, TNetworkAddress targetBackend) {
         TGetRealtimeExecStatusResponse resp = null;
         BackendService.Client client = null;
 
@@ -268,6 +270,7 @@ public class ProfileManager extends MasterDaemon {
         try {
             TGetRealtimeExecStatusRequest req = new 
TGetRealtimeExecStatusRequest();
             req.setId(queryID);
+            req.setReqType(reqType);
             resp = client.getRealtimeExecStatus(req);
         } catch (TException e) {
             LOG.warn("Got exception when getRealtimeExecStatus, query {} 
backend {}",
@@ -293,8 +296,8 @@ public class ProfileManager extends MasterDaemon {
             return null;
         }
 
-        if (!resp.isSetReportExecStatusParams()) {
-            LOG.warn("Invalid GetRealtimeExecStatusResponse, query {}",
+        if (!resp.isSetReportExecStatusParams() && !resp.isSetQueryStats()) {
+            LOG.warn("Invalid GetRealtimeExecStatusResponse, missing both exec 
status and query stats. query {}",
                     DebugUtil.printId(queryID));
             return null;
         }
@@ -302,7 +305,7 @@ public class ProfileManager extends MasterDaemon {
         return resp;
     }
 
-    private List<Future<TGetRealtimeExecStatusResponse>> 
createFetchRealTimeProfileTasks(String id) {
+    private List<Future<TGetRealtimeExecStatusResponse>> 
createFetchRealTimeProfileTasks(String id, String reqType) {
         // For query, id is queryId, for load, id is LoadLoadingTaskId
         class QueryIdAndAddress {
             public TUniqueId id;
@@ -365,9 +368,8 @@ public class ProfileManager extends MasterDaemon {
         }
 
         for (QueryIdAndAddress idAndAddress : involvedBackends) {
-            Callable<TGetRealtimeExecStatusResponse> task = () -> {
-                return getRealtimeQueryProfile(idAndAddress.id, 
idAndAddress.beAddress);
-            };
+            Callable<TGetRealtimeExecStatusResponse> task = () -> 
getRealtimeQueryProfile(idAndAddress.id,
+                    reqType, idAndAddress.beAddress);
             Future<TGetRealtimeExecStatusResponse> future = 
fetchRealTimeProfileExecutor.submit(task);
             futures.add(future);
         }
@@ -375,8 +377,57 @@ public class ProfileManager extends MasterDaemon {
         return futures;
     }
 
+    public TQueryStatistics getQueryStatistic(String queryId) throws Exception 
{
+        List<Future<TGetRealtimeExecStatusResponse>> futures = 
createFetchRealTimeProfileTasks(queryId,
+                "stats");
+        List<TQueryStatistics> queryStatisticsList = Lists.newArrayList();
+        for (Future<TGetRealtimeExecStatusResponse> future : futures) {
+            try {
+                TGetRealtimeExecStatusResponse resp = future.get(5, 
TimeUnit.SECONDS);
+                if (resp != null && resp.getStatus().status_code == 
TStatusCode.OK && resp.isSetQueryStats()) {
+                    queryStatisticsList.add(resp.getQueryStats());
+                } else {
+                    LOG.warn("Failed to get real-time query stats, id {}, resp 
is {}",
+                            queryId, resp == null ? "null" : resp.toString());
+                    throw new Exception("Failed to get realtime query stats: " 
+ resp.toString());
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to get real-time query stats, id {}, error: 
{}", queryId, e.getMessage(), e);
+                throw new Exception("Failed to get realtime query stats: " + 
e.getMessage());
+            }
+        }
+        Preconditions.checkState(!queryStatisticsList.isEmpty() && 
queryStatisticsList.size() == futures.size(),
+                String.format("Failed to get real-time stats, id %s, "
+                                + "queryStatisticsList size %d != futures size 
%d",
+                        queryId, queryStatisticsList.size(), futures.size()));
+
+        TQueryStatistics summary = new TQueryStatistics();
+        for (TQueryStatistics queryStats : queryStatisticsList) {
+            // sum all the statistics
+            summary.setScanRows(summary.getScanRows() + 
queryStats.getScanRows());
+            summary.setScanBytes(summary.getScanBytes() + 
queryStats.getScanBytes());
+            summary.setReturnedRows(summary.getReturnedRows() + 
queryStats.getReturnedRows());
+            summary.setCpuMs(summary.getCpuMs() + queryStats.getCpuMs());
+            
summary.setMaxPeakMemoryBytes(Math.max(summary.getMaxPeakMemoryBytes(),
+                    queryStats.getMaxPeakMemoryBytes()));
+            
summary.setCurrentUsedMemoryBytes(Math.max(summary.getCurrentUsedMemoryBytes(),
+                    queryStats.getCurrentUsedMemoryBytes()));
+            summary.setShuffleSendBytes(summary.getShuffleSendBytes() + 
queryStats.getShuffleSendBytes());
+            summary.setShuffleSendRows(summary.getShuffleSendRows() + 
queryStats.getShuffleSendRows());
+            summary.setScanBytesFromLocalStorage(
+                    summary.getScanBytesFromLocalStorage() + 
queryStats.getScanBytesFromLocalStorage());
+            summary.setScanBytesFromRemoteStorage(
+                    summary.getScanBytesFromRemoteStorage() + 
queryStats.getScanBytesFromRemoteStorage());
+            summary.setSpillWriteBytesToLocalStorage(
+                    summary.getSpillWriteBytesToLocalStorage() + 
queryStats.getSpillWriteBytesToLocalStorage());
+            summary.setSpillReadBytesFromLocalStorage(
+                    summary.getSpillReadBytesFromLocalStorage() + 
queryStats.getSpillReadBytesFromLocalStorage());
+        }
+        return summary;
+    }
+
     public String getProfile(String id) {
-        List<Future<TGetRealtimeExecStatusResponse>> futures = 
createFetchRealTimeProfileTasks(id);
+        List<Future<TGetRealtimeExecStatusResponse>> futures = 
createFetchRealTimeProfileTasks(id, "profile");
         // beAddr of reportExecStatus of QeProcessorImpl is meaningless, so 
assign a dummy address
         // to avoid compile failing.
         TNetworkAddress dummyAddr = new TNetworkAddress();
@@ -1057,3 +1108,4 @@ public class ProfileManager extends MasterDaemon {
         }
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 8d9f791c0b0..e21e61b4ed9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -18,6 +18,8 @@
 package org.apache.doris.httpv2.controller;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.httpv2.entity.ResponseBody;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.rest.RestBaseController;
@@ -56,20 +58,8 @@ public class SessionController extends RestBaseController {
     private static final Logger LOG = 
LogManager.getLogger(SessionController.class);
 
     static {
-        SESSION_TABLE_HEADER.add("CurrentConnected");
-        SESSION_TABLE_HEADER.add("Id");
-        SESSION_TABLE_HEADER.add("User");
-        SESSION_TABLE_HEADER.add("Host");
-        SESSION_TABLE_HEADER.add("LoginTime");
-        SESSION_TABLE_HEADER.add("Catalog");
-        SESSION_TABLE_HEADER.add("Db");
-        SESSION_TABLE_HEADER.add("Command");
-        SESSION_TABLE_HEADER.add("Time");
-        SESSION_TABLE_HEADER.add("State");
-        SESSION_TABLE_HEADER.add("QueryId");
-        SESSION_TABLE_HEADER.add("Info");
-        SESSION_TABLE_HEADER.add("FE");
-        SESSION_TABLE_HEADER.add("CloudCluster");
+        Table tbl = SchemaTable.TABLE_MAP.get("processlist");
+        tbl.getBaseSchema().stream().forEach(column -> 
SESSION_TABLE_HEADER.add(column.getName()));
     }
 
     @RequestMapping(path = "/session/all", method = RequestMethod.GET)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
index 8caab8df2d9..4330e4ace56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.httpv2.entity.ResponseBody;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
 
 import com.google.common.base.Strings;
 import com.google.gson.reflect.TypeToken;
@@ -57,6 +58,11 @@ public class HttpUtils {
                 .collect(Collectors.toList());
     }
 
+    static boolean isCurrentFe(String ip, int port) {
+        HostInfo hostInfo = Env.getCurrentEnv().getSelfNode();
+        return hostInfo.isSame(new HostInfo(ip, port));
+    }
+
     static String concatUrl(Pair<String, Integer> ipPort, String path, 
Map<String, String> arguments) {
         StringBuilder url = new StringBuilder("http://";)
                 
.append(ipPort.first).append(":").append(ipPort.second).append(path);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 0886edb56fb..829ffe05b06 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -38,6 +38,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.base.Strings;
@@ -45,8 +46,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
@@ -307,7 +311,31 @@ public class QueryProfileAction extends RestBaseController 
{
             @RequestParam(value = IS_ALL_NODE_PARA, required = false, 
defaultValue = "true") boolean isAllNode) {
         executeCheckPassword(request, response);
 
+        try {
+            String queryId = getQueryIdByTraceIdImpl(request, traceId, 
isAllNode);
+            return ResponseEntityBuilder.ok(queryId);
+        } catch (Exception e) {
+            return ResponseEntityBuilder.badRequest(e.getMessage());
+        }
+    }
+
+    /**
+     * Get query id by trace id.
+     * return a non-empty query id corresponding to the trace id.
+     * Will throw an exception if the trace id is not found, or query id is 
empty, or user does not have permission.
+     */
+    private String getQueryIdByTraceIdImpl(HttpServletRequest request, String 
traceId, boolean isAllNode)
+            throws Exception {
+        // Get query id by trace id in current FE
+        ExecuteEnv env = ExecuteEnv.getInstance();
+        String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
+        if (!Strings.isNullOrEmpty(queryId)) {
+            checkAuthByUserAndQueryId(queryId);
+            return queryId;
+        }
+
         if (isAllNode) {
+            // If the query id is not found in current FE, try to get it from 
other FE
             String httpPath = "/rest/v2/manager/query/trace_id/" + traceId;
             ImmutableMap<String, String> arguments =
                     ImmutableMap.<String, 
String>builder().put(IS_ALL_NODE_PARA, "false").build();
@@ -315,33 +343,29 @@ public class QueryProfileAction extends 
RestBaseController {
             ImmutableMap<String, String> header = ImmutableMap.<String, 
String>builder()
                     .put(NodeAction.AUTHORIZATION, 
request.getHeader(NodeAction.AUTHORIZATION)).build();
             for (Pair<String, Integer> ipPort : frontends) {
+                if (HttpUtils.isCurrentFe(ipPort.first, ipPort.second)) {
+                    // skip current FE.
+                    continue;
+                }
                 String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
-                try {
-                    String responseJson = HttpUtils.doGet(url, header);
-                    int code = 
JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt();
-                    if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
-                        return responseJson;
+                String responseJson = HttpUtils.doGet(url, header);
+                JsonObject jObj = 
JsonParser.parseString(responseJson).getAsJsonObject();
+                int code = jObj.get("code").getAsInt();
+                if (code == HttpUtils.REQUEST_SUCCESS_CODE) {
+                    if (!jObj.has("data") || jObj.get("data").isJsonNull() || 
Strings.isNullOrEmpty(
+                            jObj.get("data").getAsString())) {
+                        throw new Exception(String.format("trace id %s not 
found", traceId));
                     }
-                } catch (Exception e) {
-                    LOG.warn(e);
+                    return jObj.get("data").getAsString();
                 }
+                LOG.warn("get query id by trace id error, resp: {}", 
responseJson);
+                // If the response code is not success, it means that the 
trace id is not found in this FE.
+                // Continue to try the next FE.
             }
-        } else {
-            ExecuteEnv env = ExecuteEnv.getInstance();
-            String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
-            if (Strings.isNullOrEmpty(queryId)) {
-                return ResponseEntityBuilder.badRequest("Not found");
-            }
-
-            try {
-                checkAuthByUserAndQueryId(queryId);
-            } catch (AuthenticationException e) {
-                return ResponseEntityBuilder.badRequest(e.getMessage());
-            }
-
-            return ResponseEntityBuilder.ok(queryId);
         }
-        return ResponseEntityBuilder.badRequest("not found query id");
+
+        // Not found in all FE.
+        throw new Exception(String.format("trace id %s not found", traceId));
     }
 
     /**
@@ -505,4 +529,69 @@ public class QueryProfileAction extends RestBaseController 
{
         env.getScheduler().cancelQuery(queryId, new 
Status(TStatusCode.CANCELLED, "cancel query by rest api"));
         return ResponseEntityBuilder.ok();
     }
+
+    /**
+     * Get real-time query statistics for with given query id.
+     * This API is used for getting the runtime query progress
+     *
+     * @param request
+     * @param response
+     * @param traceId: The user specified trace id, eg, set 
session_context="trace_id:123456";
+     * @return
+     */
+    @RequestMapping(path = "/statistics/{trace_id}", method = 
RequestMethod.GET)
+    public Object queryStatistics(HttpServletRequest request, 
HttpServletResponse response,
+            @PathVariable("trace_id") String traceId) {
+        executeCheckPassword(request, response);
+
+        String queryId = null;
+        try {
+            queryId = getQueryIdByTraceIdImpl(request, traceId, true);
+        } catch (Exception e) {
+            return ResponseEntityBuilder.badRequest(e.getMessage());
+        }
+
+        try {
+            TQueryStatistics statistic = 
ProfileManager.getInstance().getQueryStatistic(queryId);
+            return ResponseEntityBuilder.ok(new QueryStatistics(statistic));
+        } catch (Exception e) {
+            LOG.warn("get query statistics error, queryId:{}", queryId, e);
+            return ResponseEntityBuilder.badRequest(e.getMessage());
+        }
+    }
+
+    /**
+     * A class that represents the query runtime statistics.
+     */
+    @Getter
+    @Setter
+    public static class QueryStatistics {
+        public long scanRows;
+        public long scanBytes;
+        public long returnedRows;
+        public long cpuMs;
+        public long maxPeakMemoryBytes;
+        public long currentUsedMemoryBytes;
+        public long shuffleSendBytes;
+        public long shuffleSendRows;
+        public long scanBytesFromLocalStorage;
+        public long scanBytesFromRemoteStorage;
+        public long spillWriteBytesToLocalStorage;
+        public long spillReadBytesFromLocalStorage;
+
+        public QueryStatistics(TQueryStatistics queryStatistics) {
+            this.scanRows = queryStatistics.getScanRows();
+            this.scanBytes = queryStatistics.getScanBytes();
+            this.returnedRows = queryStatistics.getReturnedRows();
+            this.cpuMs = queryStatistics.getCpuMs();
+            this.maxPeakMemoryBytes = queryStatistics.getMaxPeakMemoryBytes();
+            this.currentUsedMemoryBytes = 
queryStatistics.getCurrentUsedMemoryBytes();
+            this.shuffleSendBytes = queryStatistics.getShuffleSendBytes();
+            this.shuffleSendRows = queryStatistics.getShuffleSendRows();
+            this.scanBytesFromLocalStorage = 
queryStatistics.getScanBytesFromLocalStorage();
+            this.scanBytesFromRemoteStorage = 
queryStatistics.getScanBytesFromRemoteStorage();
+            this.spillWriteBytesToLocalStorage = 
queryStatistics.getSpillWriteBytesToLocalStorage();
+            this.spillReadBytesFromLocalStorage = 
queryStatistics.getSpillReadBytesFromLocalStorage();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
index 2fb33c45bf6..34bb18fe5e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java
@@ -17,10 +17,9 @@
 
 package org.apache.doris.nereids.trees.plans.commands;
 
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SchemaTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.proc.FrontendsProcNode;
@@ -47,21 +46,14 @@ import java.util.Optional;
  */
 public class ShowProcessListCommand extends ShowCommand {
     private static final Logger LOG = 
LogManager.getLogger(ShowProcessListCommand.class);
-    private static final ShowResultSetMetaData PROCESSLIST_META_DATA = 
ShowResultSetMetaData.builder()
-            .addColumn(new Column("CurrentConnected", 
ScalarType.createVarchar(16)))
-            .addColumn(new Column("Id", 
ScalarType.createType(PrimitiveType.BIGINT)))
-            .addColumn(new Column("User", ScalarType.createVarchar(16)))
-            .addColumn(new Column("Host", ScalarType.createVarchar(16)))
-            .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
-            .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
-            .addColumn(new Column("Db", ScalarType.createVarchar(16)))
-            .addColumn(new Column("Command", ScalarType.createVarchar(16)))
-            .addColumn(new Column("Time", 
ScalarType.createType(PrimitiveType.INT)))
-            .addColumn(new Column("State", ScalarType.createVarchar(64)))
-            .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
-            .addColumn(new Column("Info", ScalarType.STRING))
-            .addColumn(new Column("FE", ScalarType.createVarchar(16)))
-            .addColumn(new Column("CloudCluster", 
ScalarType.createVarchar(16))).build();
+    private static final ShowResultSetMetaData PROCESSLIST_META_DATA;
+
+    static {
+        Table tbl = SchemaTable.TABLE_MAP.get("processlist");
+        ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
+        tbl.getBaseSchema().stream().forEach(column -> 
builder.addColumn(column));
+        PROCESSLIST_META_DATA = builder.build();
+    }
 
     private final boolean isFull;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c980de73b5a..78efb8a2d74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -949,6 +949,10 @@ public class ConnectContext {
     }
 
     public void setTraceId(String traceId) {
+        // When traceId is set, we need to remove the old traceId from 
connectScheduler.
+        if (connectScheduler != null) {
+            connectScheduler.removeOldTraceId(this.traceId);
+        }
         this.traceId = traceId;
     }
 
@@ -1225,6 +1229,7 @@ public class ConnectContext {
             row.add("" + (nowMs - startTime) / 1000);
             row.add(state.toString());
             row.add(DebugUtil.printId(queryId));
+            row.add(Strings.nullToEmpty(traceId));
             if (state.getStateType() == QueryState.MysqlStateType.ERR) {
                 row.add(state.getErrorMessage());
             } else if (executor != null) {
@@ -1247,7 +1252,6 @@ public class ConnectContext {
         }
     }
 
-
     public void startAcceptQuery(ConnectProcessor connectProcessor) {
         mysqlChannel.startAcceptQuery(this, connectProcessor);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
index ce91ef09e37..e0f311aec85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
@@ -84,6 +84,9 @@ public class ConnectPoolMgr {
             if (conns != null) {
                 conns.decrementAndGet();
             }
+            if (ctx.traceId() != null) {
+                traceId2QueryId.remove(ctx.traceId());
+            }
             numberConnection.decrementAndGet();
         }
     }
@@ -155,6 +158,12 @@ public class ConnectPoolMgr {
         return queryId == null ? "" : DebugUtil.printId(queryId);
     }
 
+    public void removeTraceId(String traceId) {
+        if (traceId != null) {
+            traceId2QueryId.remove(traceId);
+        }
+    }
+
     public Map<Integer, ConnectContext> getConnectionMap() {
         return connectionMap;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index c8d27a23db6..32ea481fa9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -138,6 +138,11 @@ public class ConnectScheduler {
         return queryId;
     }
 
+    public void removeOldTraceId(String traceId) {
+        connectPoolMgr.removeTraceId(traceId);
+        flightSqlConnectPoolMgr.removeTraceId(traceId);
+    }
+
     public Map<Integer, ConnectContext> getConnectionMap() {
         Map<Integer, ConnectContext> map = Maps.newConcurrentMap();
         map.putAll(connectPoolMgr.getConnectionMap());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
index c9266cf6add..0fbcb5690fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java
@@ -403,7 +403,7 @@ public class CoordinatorContext {
             List<AssignedJob> instanceJobs = pipelinePlan.getInstanceJobs();
             for (AssignedJob instanceJob : instanceJobs) {
                 DistributedPlanWorker worker = instanceJob.getAssignedWorker();
-                backends.put(new TNetworkAddress(worker.address(), 
worker.port()), worker.id());
+                backends.put(new TNetworkAddress(worker.host(), 
worker.port()), worker.id());
             }
         }
         return backends;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index 1c576053113..84bc45b19f4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -102,7 +102,7 @@ public class ConnectContextTest {
         // Thread info
         Assert.assertNotNull(ctx.toThreadInfo(false));
         List<String> row = ctx.toThreadInfo(false).toRow(101, 1000, 
Optional.empty());
-        Assert.assertEquals(14, row.size());
+        Assert.assertEquals(15, row.size());
         Assert.assertEquals("Yes", row.get(0));
         Assert.assertEquals("101", row.get(1));
         Assert.assertEquals("testUser", row.get(2));
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 16d1afda43e..6754e5f9f9f 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -344,11 +344,14 @@ struct TPublishTopicResult {
 struct TGetRealtimeExecStatusRequest {
     // maybe query id or other unique id
     1: optional Types.TUniqueId id
+    2: optional string req_type // "stats" or "profile"
 }
 
 struct TGetRealtimeExecStatusResponse {
     1: optional Status.TStatus status
     2: optional FrontendService.TReportExecStatusParams 
report_exec_status_params
+    // query_stats is for getting real-time query statistics of a certain query
+    3: optional FrontendService.TQueryStatistics query_stats
 }
 
 struct TDictionaryStatus {
diff --git a/regression-test/plugins/plugin_curl_requester.groovy 
b/regression-test/plugins/plugin_curl_requester.groovy
index 05c43bbeebf..6fb76589fff 100644
--- a/regression-test/plugins/plugin_curl_requester.groovy
+++ b/regression-test/plugins/plugin_curl_requester.groovy
@@ -112,7 +112,7 @@ Suite.metaClass.http_client = { String method, String url 
/* param */ ->
 
 logger.info("Added 'http_client' function to Suite")
 
-Suite.metaClass.curl = { String method, String url, String body = null, 
Integer timeoutSec = 10 /* param */->
+Suite.metaClass.curl = { String method, String url, String body = null, 
Integer timeoutSec = 10, String user = "", String pwd = ""->
     Suite suite = delegate as Suite
     if (method != "GET" && method != "POST") {
         throw new Exception(String.format("invalid curl method: %s", method))
@@ -125,11 +125,15 @@ Suite.metaClass.curl = { String method, String url, 
String body = null, Integer
     Integer retryCount = 0; // Current retry count
     Integer sleepTime = 5000; // Sleep time in milliseconds
 
+    String auth = "";
+    if (!user.equals("")) {
+        auth = String.format("-u%s:%s", user, pwd).toString();
+    }
     String cmd
     if (method == "POST" && body != null) {
-        cmd = String.format("curl --max-time %d -X %s -H 
Content-Type:application/json -d %s %s", timeoutSec, method, body, 
url).toString()
+        cmd = String.format("curl %s --max-time %d -X %s -H 
Content-Type:application/json -d %s %s", auth, timeoutSec, method, body, 
url).toString()
     } else {
-        cmd = String.format("curl --max-time %d -X %s %s", timeoutSec, method, 
url).toString()
+        cmd = String.format("curl %s --max-time %d -X %s %s", auth, 
timeoutSec, method, url).toString()
     }
 
     logger.info("curl cmd: " + cmd)
diff --git 
a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
 
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
index aa312ac0b0c..37e7c3c4bbe 100644
--- 
a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
+++ 
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
@@ -63,10 +63,10 @@ suite("test_single_compaction_with_variant_inverted", "p2, 
nonConcurrent") {
         out = process.getText()
         logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
         if (!disableAutoCompaction) {
-            return "Success, " + out
+            return "Success, " + out2
         }
-        assertEquals(code, 0)
-        return out
+        assertEquals(code2, 0)
+        return out2
     } 
 
     def triggerSingleCompaction = { be_host, be_http_port, tablet_id ->
@@ -84,10 +84,10 @@ suite("test_single_compaction_with_variant_inverted", "p2, 
nonConcurrent") {
         out = process.getText()
         logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
         if (!disableAutoCompaction) {
-            return "Success, " + out
+            return "Success, " + out3
         }
-        assertEquals(code, 0)
-        return out
+        assertEquals(code3, 0)
+        return out3
     }
     def waitForCompaction = { be_host, be_http_port, tablet_id ->
         boolean running = true
diff --git 
a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
 
b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
index 17b42303f7f..3e3e168c054 100644
--- 
a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
+++ 
b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy
@@ -88,7 +88,7 @@ suite("test_information_schema_timezone", 
"p0,external,hive,kerberos,external_do
 
     // 4. processlist
     List<List<Object>> processlist_res_1 = sql """ 
-            select LOGIN_TIME from information_schema.processlist where INFO 
like "%information_schema.processlist%"
+            select LOGINTIME from information_schema.processlist where INFO 
like "%information_schema.processlist%"
             """
     logger.info("processlist_res_1 = " + processlist_res_1);
     
@@ -171,7 +171,7 @@ suite("test_information_schema_timezone", 
"p0,external,hive,kerberos,external_do
 
     // 4. processlist
     List<List<Object>> processlist_res_2 = sql """ 
-            select LOGIN_TIME from information_schema.processlist where INFO 
like "%information_schema.processlist%"
+            select LOGINTIME from information_schema.processlist where INFO 
like "%information_schema.processlist%"
             """
     logger.info("processlist_res_2 = " + processlist_res_2);
     assertEquals(true, isEightHoursDiff(processlist_res_1[0][0], 
processlist_res_2[0][0]))
diff --git a/regression-test/suites/manager/test_manager_interface_5.groovy 
b/regression-test/suites/manager/test_manager_interface_5.groovy
new file mode 100644
index 00000000000..5c44ae3d2d4
--- /dev/null
+++ b/regression-test/suites/manager/test_manager_interface_5.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+import java.time.LocalDateTime
+import java.time.Duration
+import java.time.format.DateTimeFormatter
+
+suite('test_manager_interface_5',"p0") {
+    // /rest/v2/manager/query/trace_id/{trace_id}
+    // /rest/v2/manager/query/trace_id/{trace_id}
+    def test_trace_id = {
+        def futures = []
+        futures.add( thread {
+            try{
+                sql """set 
session_context="trace_id:test_manager_interface_5_trace_id""""
+                sql """ select *  from numbers("number" = "9910") as  a  join 
numbers('number'="18880094567") as b on a.number = b.number; """
+            }catch(Exception e){
+                
+            }
+        })
+
+        futures.add( thread {
+            // test trace id in processlist
+            sleep(500);
+            List<List<Object>> result = sql_return_maparray """  show 
processlist  """
+            def queryid = ""
+            def x = 0
+            logger.info("result = ${result}")
+            for( int i =0 ;i < result.size();i++ ){
+                if (result[i]["Info"].contains("18880094567")) {
+                    queryid = result[i]["QueryId"]
+                    
assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id"))
+                    x = 1
+                    break;
+                }
+            }
+            assertTrue(x == 1)
+
+            result = sql_return_maparray """  select * from 
information_schema.processlist  """
+            def queryid2 = ""
+            def x2 = 0
+            logger.info("result = ${result}")
+            for( int i =0 ;i < result.size();i++ ){
+                if (result[i]["Info"].contains("18880094567")) {
+                    queryid2 = result[i]["QueryId"]
+                    
assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id"))
+                    x2 = 1
+                    break;
+                }
+            }
+            assertTrue(x2 == 1)
+
+            // test trace id in cancel query and get realtime query statistics
+            // 1. test get query id by trace id
+            def fes = sql_return_maparray "show frontends"
+            logger.info("frontends: ${fes}")
+            def fe = fes[0]
+            def url = 
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/trace_id/test_manager_interface_5_trace_id?is_all_node=true";
+            def (code, out, err) = curl("GET", url, null, 10, 
context.config.jdbcUser, context.config.jdbcPassword)
+            println "${out}"
+            assertTrue(code == 0)
+            def getQueryId = parseJson(out).get("data");
+            assertEquals(queryid2, getQueryId);
+
+            // 2. test get realtime query statistics by trace id
+            url = 
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/statistics/test_manager_interface_5_trace_id?is_all_node=true";
+            (code, out, err) = curl("GET", url, null, 10, 
context.config.jdbcUser, context.config.jdbcPassword)
+            println "${out}"
+            assertTrue(code == 0)
+            def stats = parseJson(out).get("data");
+            assertTrue(stats.containsKey("cpuMs"));
+
+            // 3. test cancel query by query id
+            url = 
"http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/kill/${queryid2}?is_all_node=true";
+            (code, out, err) = curl("POST", url, null, 10, 
context.config.jdbcUser, context.config.jdbcPassword)
+            println "${out}"
+            assertTrue(code == 0)
+        })
+        futures.each { it.get() }
+    }
+    test_trace_id();
+}
diff --git a/regression-test/suites/show_p0/test_show_processlist.groovy 
b/regression-test/suites/show_p0/test_show_processlist.groovy
index b093261e7b2..e34db3b2524 100644
--- a/regression-test/suites/show_p0/test_show_processlist.groovy
+++ b/regression-test/suites/show_p0/test_show_processlist.groovy
@@ -21,20 +21,39 @@ suite("test_show_processlist") {
     sql """set show_all_fe_connection = false;"""
     def result = sql """show processlist;"""
     logger.info("result:${result}")
-    assertTrue(result[0].size() == 14)
+    assertTrue(result[0].size() == 15)
     sql """set show_all_fe_connection = true;"""
     result = sql """show processlist;"""
     logger.info("result:${result}")
-    assertTrue(result[0].size() == 14)
+    assertTrue(result[0].size() == 15)
     sql """set show_all_fe_connection = false;"""
 
     def url1 = "http://${context.config.feHttpAddress}/rest/v1/session";
     result =  Http.GET(url1, true)
     logger.info("result:${result}")
-    assertTrue(result["data"]["column_names"].size() == 14);
+    assertTrue(result["data"]["column_names"].size() == 15);
 
     def url2 = "http://${context.config.feHttpAddress}/rest/v1/session/all";
     result = Http.GET(url2, true)
     logger.info("result:${result}")
-    assertTrue(result["data"]["column_names"].size() == 14);
-}
\ No newline at end of file
+    assertTrue(result["data"]["column_names"].size() == 15);
+
+    result = sql """select * from information_schema.processlist"""
+    logger.info("result:${result}")
+    assertTrue(result[0].size() == 15)
+
+    
+    def result1 = connect('root', context.config.jdbcPassword, 
context.config.jdbcUrl) {
+        // execute sql with admin user
+        sql 'select 99 + 1'
+        sql 'set session_context="trace_id:test_show_processlist_trace_id"'
+        def result2 = sql """select * from information_schema.processlist"""
+        def found = false;
+        for (def row in result2) {
+            if (row[11].equals("test_show_processlist_trace_id")) {
+                found = true;
+            }
+        }
+        assertTrue(found)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to