This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new af68d8b5077 [feat](profile) support getting query progress (#51400) af68d8b5077 is described below commit af68d8b5077782d65ec9cadba479ed58b8789e90 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Sat Jun 21 00:39:30 2025 +0800 [feat](profile) support getting query progress (#51400) ### What problem does this PR solve? Followup #50791 Add a new FE HTTP API: `/rest/v2/manager/query/statistics/trace_id`. This API will return the query runtime statistic corresponding to a given trace id. The query statistics includes info such as real-time scan rows/bytes. Internally, Doris will get query id by trace id from all Frontends, and then fetch query statistics from BE. Use pattern: 1. User set custom trace id by: `set session_context="trace_id:my_trace_id"` 2. User executes a query in same session 3. Start a http client to get query statistics in real-time during the query process.  Also fix a bug in `CoordinatorContext.java`, to get real host. introduced from #41730 This PR also change the column name of `information_schema.processlist` table, to be same as column name in `show processlist`. --- .../schema_scanner/schema_processlist_scanner.cpp | 39 +++--- be/src/runtime/fragment_mgr.cpp | 9 ++ be/src/runtime/fragment_mgr.h | 2 + be/src/runtime/runtime_query_statistics_mgr.cpp | 13 ++ be/src/runtime/runtime_query_statistics_mgr.h | 3 +- be/src/service/backend_service.cpp | 25 ++-- .../java/org/apache/doris/catalog/SchemaTable.java | 32 ++--- .../doris/common/profile/ProfileManager.java | 68 +++++++++-- .../doris/httpv2/controller/SessionController.java | 18 +-- .../doris/httpv2/rest/manager/HttpUtils.java | 6 + .../httpv2/rest/manager/QueryProfileAction.java | 133 +++++++++++++++++---- .../plans/commands/ShowProcessListCommand.java | 28 ++--- .../java/org/apache/doris/qe/ConnectContext.java | 6 +- .../java/org/apache/doris/qe/ConnectPoolMgr.java | 9 ++ .../java/org/apache/doris/qe/ConnectScheduler.java | 5 + .../org/apache/doris/qe/CoordinatorContext.java | 2 +- .../org/apache/doris/qe/ConnectContextTest.java | 2 +- gensrc/thrift/BackendService.thrift | 3 + .../plugins/plugin_curl_requester.groovy | 10 +- ...e_compaction_with_variant_inverted_index.groovy | 12 +- .../test_information_schema_timezone.groovy | 4 +- .../suites/manager/test_manager_interface_5.groovy | 99 +++++++++++++++ .../suites/show_p0/test_show_processlist.groovy | 29 ++++- 23 files changed, 438 insertions(+), 119 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index 7acb82ea849..69367556767 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -33,20 +33,21 @@ namespace doris { #include "common/compile_check_begin.h" std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = { - {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false}, - {"ID", TYPE_LARGEINT, sizeof(int128_t), false}, - {"USER", TYPE_VARCHAR, sizeof(StringRef), false}, - {"HOST", TYPE_VARCHAR, sizeof(StringRef), false}, - {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, - {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false}, - {"DB", TYPE_VARCHAR, sizeof(StringRef), false}, - {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false}, - {"TIME", TYPE_INT, sizeof(int32_t), false}, - {"STATE", TYPE_VARCHAR, sizeof(StringRef), false}, - {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, - {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}, - {"FE", TYPE_VARCHAR, sizeof(StringRef), false}, - {"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}}; + {"CurrentConnected", TYPE_VARCHAR, sizeof(StringRef), false}, // 0 + {"Id", TYPE_LARGEINT, sizeof(int128_t), false}, // 1 + {"User", TYPE_VARCHAR, sizeof(StringRef), false}, // 2 + {"Host", TYPE_VARCHAR, sizeof(StringRef), false}, // 3 + {"LoginTime", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, // 4 + {"Catalog", TYPE_VARCHAR, sizeof(StringRef), false}, // 5 + {"Db", TYPE_VARCHAR, sizeof(StringRef), false}, // 6 + {"Command", TYPE_VARCHAR, sizeof(StringRef), false}, // 7 + {"Time", TYPE_INT, sizeof(int32_t), false}, // 8 + {"State", TYPE_VARCHAR, sizeof(StringRef), false}, // 9 + {"QueryId", TYPE_VARCHAR, sizeof(StringRef), false}, // 10 + {"TraceId", TYPE_VARCHAR, sizeof(StringRef), false}, // 11 + {"Info", TYPE_VARCHAR, sizeof(StringRef), false}, // 12 + {"FE", TYPE_VARCHAR, sizeof(StringRef), false}, // 13 + {"CloudCluster", TYPE_VARCHAR, sizeof(StringRef), false}}; // 14 SchemaProcessListScanner::SchemaProcessListScanner() : SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {} @@ -62,6 +63,16 @@ Status SchemaProcessListScanner::start(RuntimeState* state) { TShowProcessListResult tmp_ret; RETURN_IF_ERROR( SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret)); + + // Check and adjust the number of columns in each row to ensure 15 columns + // This is compatible with newly added column "trace id". #51400 + for (auto& row : tmp_ret.process_list) { + if (row.size() == 14) { + // Insert an empty string at position 11 (index 11) for the TRACE_ID column + row.insert(row.begin() + 11, ""); + } + } + _process_list_result.process_list.insert(_process_list_result.process_list.end(), tmp_ret.process_list.begin(), tmp_ret.process_list.end()); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 230cb12a110..90ec55ac0b5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1382,4 +1382,13 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, return Status::OK(); } +Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) { + if (query_stats == nullptr) { + return Status::InvalidArgument("query_stats is nullptr"); + } + + return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics( + print_id(query_id), query_stats); +} + } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 4eab31ff3a2..e6f388f73e1 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -184,6 +184,8 @@ public: Status get_realtime_exec_status(const TUniqueId& query_id, TReportExecStatusParams* exec_status); + // get the query statistics of with a given query id + Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats); std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id); diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 79db2cfec56..772925dcb7e 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -511,6 +511,19 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo } } +Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id, + TQueryStatistics* query_stats) { + std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); + + auto resource_ctx = _resource_contexts_map.find(query_id); + if (resource_ctx == _resource_contexts_map.end()) { + return Status::InternalError("failed to find query with id {}", query_id); + } + + resource_ctx->second->to_thrift_query_statistics(query_stats); + return Status::OK(); +} + void RuntimeQueryStatisticsMgr::get_tasks_resource_context( std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) { std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index ef80b6fa5b9..6b2e4545390 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -55,6 +55,7 @@ public: // used for backend_active_tasks void get_active_be_tasks_block(vectorized::Block* block); + Status get_query_statistics(const std::string& query_id, TQueryStatistics* query_stats); // used for MemoryReclamation void get_tasks_resource_context(std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs); @@ -95,4 +96,4 @@ private: std::unique_ptr<ThreadPool> _thread_pool; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 114d99fe264..33fa4401391 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1321,19 +1321,28 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse std::unique_ptr<TReportExecStatusParams> report_exec_status_params = std::make_unique<TReportExecStatusParams>(); - Status st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status( - request.id, report_exec_status_params.get()); + std::unique_ptr<TQueryStatistics> query_stats = std::make_unique<TQueryStatistics>(); - if (!st.ok()) { - response.__set_status(st.to_thrift()); - return; + std::string req_type = request.__isset.req_type ? request.req_type : "profile"; + Status st; + if (req_type == "stats") { + st = ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id, + query_stats.get()); + if (st.ok()) { + response.__set_query_stats(*query_stats); + } + } else { + // default is "profile" + st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status( + request.id, report_exec_status_params.get()); + if (st.ok()) { + response.__set_report_exec_status_params(*report_exec_status_params); + } } report_exec_status_params->__set_query_id(TUniqueId()); report_exec_status_params->__set_done(false); - - response.__set_status(Status::OK().to_thrift()); - response.__set_report_exec_status_params(*report_exec_status_params); + response.__set_status(st.to_thrift()); } void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 3d7f9603d97..ca42d7af40f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -523,22 +523,24 @@ public class SchemaTable extends Table { .column("IS_MUTABLE", ScalarType.createType(PrimitiveType.BOOLEAN)) .build())) .put("processlist", + // ATTN, the column name should be compatible with MySQL + // See: https://dev.mysql.com/doc/refman/8.4/en/show-processlist.html new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, - builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16)) - .column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) - .column("USER", ScalarType.createVarchar(32)) - .column("HOST", ScalarType.createVarchar(261)) - .column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2)) - .column("CATALOG", ScalarType.createVarchar(64)) - .column("DB", ScalarType.createVarchar(64)) - .column("COMMAND", ScalarType.createVarchar(16)) - .column("TIME", ScalarType.createType(PrimitiveType.INT)) - .column("STATE", ScalarType.createVarchar(64)) - .column("QUERY_ID", ScalarType.createVarchar(256)) - .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) - .column("FE", - ScalarType.createVarchar(64)) - .column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build(), true)) + builder().column("CurrentConnected", ScalarType.createVarchar(16)) + .column("Id", ScalarType.createType(PrimitiveType.LARGEINT)) + .column("User", ScalarType.createVarchar(32)) + .column("Host", ScalarType.createVarchar(261)) + .column("LoginTime", ScalarType.createType(PrimitiveType.DATETIMEV2)) + .column("Catalog", ScalarType.createVarchar(64)) + .column("Db", ScalarType.createVarchar(64)) + .column("Command", ScalarType.createVarchar(16)) + .column("Time", ScalarType.createType(PrimitiveType.INT)) + .column("State", ScalarType.createVarchar(64)) + .column("QueryId", ScalarType.createVarchar(256)) + .column("TraceId", ScalarType.createVarchar(256)) + .column("Info", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) + .column("FE", ScalarType.createVarchar(64)) + .column("CloudCluster", ScalarType.createVarchar(64)).build(), true)) .put("workload_policy", new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA, builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java index 5666ca8965a..36aa9a68b25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java @@ -32,9 +32,11 @@ import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TGetRealtimeExecStatusRequest; import org.apache.doris.thrift.TGetRealtimeExecStatusResponse; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -253,7 +255,7 @@ public class ProfileManager extends MasterDaemon { } private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile( - TUniqueId queryID, TNetworkAddress targetBackend) { + TUniqueId queryID, String reqType, TNetworkAddress targetBackend) { TGetRealtimeExecStatusResponse resp = null; BackendService.Client client = null; @@ -268,6 +270,7 @@ public class ProfileManager extends MasterDaemon { try { TGetRealtimeExecStatusRequest req = new TGetRealtimeExecStatusRequest(); req.setId(queryID); + req.setReqType(reqType); resp = client.getRealtimeExecStatus(req); } catch (TException e) { LOG.warn("Got exception when getRealtimeExecStatus, query {} backend {}", @@ -293,8 +296,8 @@ public class ProfileManager extends MasterDaemon { return null; } - if (!resp.isSetReportExecStatusParams()) { - LOG.warn("Invalid GetRealtimeExecStatusResponse, query {}", + if (!resp.isSetReportExecStatusParams() && !resp.isSetQueryStats()) { + LOG.warn("Invalid GetRealtimeExecStatusResponse, missing both exec status and query stats. query {}", DebugUtil.printId(queryID)); return null; } @@ -302,7 +305,7 @@ public class ProfileManager extends MasterDaemon { return resp; } - private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id) { + private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id, String reqType) { // For query, id is queryId, for load, id is LoadLoadingTaskId class QueryIdAndAddress { public TUniqueId id; @@ -365,9 +368,8 @@ public class ProfileManager extends MasterDaemon { } for (QueryIdAndAddress idAndAddress : involvedBackends) { - Callable<TGetRealtimeExecStatusResponse> task = () -> { - return getRealtimeQueryProfile(idAndAddress.id, idAndAddress.beAddress); - }; + Callable<TGetRealtimeExecStatusResponse> task = () -> getRealtimeQueryProfile(idAndAddress.id, + reqType, idAndAddress.beAddress); Future<TGetRealtimeExecStatusResponse> future = fetchRealTimeProfileExecutor.submit(task); futures.add(future); } @@ -375,8 +377,57 @@ public class ProfileManager extends MasterDaemon { return futures; } + public TQueryStatistics getQueryStatistic(String queryId) throws Exception { + List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(queryId, + "stats"); + List<TQueryStatistics> queryStatisticsList = Lists.newArrayList(); + for (Future<TGetRealtimeExecStatusResponse> future : futures) { + try { + TGetRealtimeExecStatusResponse resp = future.get(5, TimeUnit.SECONDS); + if (resp != null && resp.getStatus().status_code == TStatusCode.OK && resp.isSetQueryStats()) { + queryStatisticsList.add(resp.getQueryStats()); + } else { + LOG.warn("Failed to get real-time query stats, id {}, resp is {}", + queryId, resp == null ? "null" : resp.toString()); + throw new Exception("Failed to get realtime query stats: " + resp.toString()); + } + } catch (Exception e) { + LOG.warn("Failed to get real-time query stats, id {}, error: {}", queryId, e.getMessage(), e); + throw new Exception("Failed to get realtime query stats: " + e.getMessage()); + } + } + Preconditions.checkState(!queryStatisticsList.isEmpty() && queryStatisticsList.size() == futures.size(), + String.format("Failed to get real-time stats, id %s, " + + "queryStatisticsList size %d != futures size %d", + queryId, queryStatisticsList.size(), futures.size())); + + TQueryStatistics summary = new TQueryStatistics(); + for (TQueryStatistics queryStats : queryStatisticsList) { + // sum all the statistics + summary.setScanRows(summary.getScanRows() + queryStats.getScanRows()); + summary.setScanBytes(summary.getScanBytes() + queryStats.getScanBytes()); + summary.setReturnedRows(summary.getReturnedRows() + queryStats.getReturnedRows()); + summary.setCpuMs(summary.getCpuMs() + queryStats.getCpuMs()); + summary.setMaxPeakMemoryBytes(Math.max(summary.getMaxPeakMemoryBytes(), + queryStats.getMaxPeakMemoryBytes())); + summary.setCurrentUsedMemoryBytes(Math.max(summary.getCurrentUsedMemoryBytes(), + queryStats.getCurrentUsedMemoryBytes())); + summary.setShuffleSendBytes(summary.getShuffleSendBytes() + queryStats.getShuffleSendBytes()); + summary.setShuffleSendRows(summary.getShuffleSendRows() + queryStats.getShuffleSendRows()); + summary.setScanBytesFromLocalStorage( + summary.getScanBytesFromLocalStorage() + queryStats.getScanBytesFromLocalStorage()); + summary.setScanBytesFromRemoteStorage( + summary.getScanBytesFromRemoteStorage() + queryStats.getScanBytesFromRemoteStorage()); + summary.setSpillWriteBytesToLocalStorage( + summary.getSpillWriteBytesToLocalStorage() + queryStats.getSpillWriteBytesToLocalStorage()); + summary.setSpillReadBytesFromLocalStorage( + summary.getSpillReadBytesFromLocalStorage() + queryStats.getSpillReadBytesFromLocalStorage()); + } + return summary; + } + public String getProfile(String id) { - List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id); + List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id, "profile"); // beAddr of reportExecStatus of QeProcessorImpl is meaningless, so assign a dummy address // to avoid compile failing. TNetworkAddress dummyAddr = new TNetworkAddress(); @@ -1057,3 +1108,4 @@ public class ProfileManager extends MasterDaemon { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java index 8d9f791c0b0..e21e61b4ed9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java @@ -18,6 +18,8 @@ package org.apache.doris.httpv2.controller; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.SchemaTable; +import org.apache.doris.catalog.Table; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.rest.RestBaseController; @@ -56,20 +58,8 @@ public class SessionController extends RestBaseController { private static final Logger LOG = LogManager.getLogger(SessionController.class); static { - SESSION_TABLE_HEADER.add("CurrentConnected"); - SESSION_TABLE_HEADER.add("Id"); - SESSION_TABLE_HEADER.add("User"); - SESSION_TABLE_HEADER.add("Host"); - SESSION_TABLE_HEADER.add("LoginTime"); - SESSION_TABLE_HEADER.add("Catalog"); - SESSION_TABLE_HEADER.add("Db"); - SESSION_TABLE_HEADER.add("Command"); - SESSION_TABLE_HEADER.add("Time"); - SESSION_TABLE_HEADER.add("State"); - SESSION_TABLE_HEADER.add("QueryId"); - SESSION_TABLE_HEADER.add("Info"); - SESSION_TABLE_HEADER.add("FE"); - SESSION_TABLE_HEADER.add("CloudCluster"); + Table tbl = SchemaTable.TABLE_MAP.get("processlist"); + tbl.getBaseSchema().stream().forEach(column -> SESSION_TABLE_HEADER.add(column.getName())); } @RequestMapping(path = "/session/all", method = RequestMethod.GET) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java index 8caab8df2d9..4330e4ace56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService.HostInfo; import com.google.common.base.Strings; import com.google.gson.reflect.TypeToken; @@ -57,6 +58,11 @@ public class HttpUtils { .collect(Collectors.toList()); } + static boolean isCurrentFe(String ip, int port) { + HostInfo hostInfo = Env.getCurrentEnv().getSelfNode(); + return hostInfo.isSame(new HostInfo(ip, port)); + } + static String concatUrl(Pair<String, Integer> ipPort, String path, Map<String, String> arguments) { StringBuilder url = new StringBuilder("http://") .append(ipPort.first).append(":").append(ipPort.second).append(path); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 0886edb56fb..829ffe05b06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -38,6 +38,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; @@ -45,8 +46,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; +import lombok.Getter; +import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -307,7 +311,31 @@ public class QueryProfileAction extends RestBaseController { @RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) { executeCheckPassword(request, response); + try { + String queryId = getQueryIdByTraceIdImpl(request, traceId, isAllNode); + return ResponseEntityBuilder.ok(queryId); + } catch (Exception e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + /** + * Get query id by trace id. + * return a non-empty query id corresponding to the trace id. + * Will throw an exception if the trace id is not found, or query id is empty, or user does not have permission. + */ + private String getQueryIdByTraceIdImpl(HttpServletRequest request, String traceId, boolean isAllNode) + throws Exception { + // Get query id by trace id in current FE + ExecuteEnv env = ExecuteEnv.getInstance(); + String queryId = env.getScheduler().getQueryIdByTraceId(traceId); + if (!Strings.isNullOrEmpty(queryId)) { + checkAuthByUserAndQueryId(queryId); + return queryId; + } + if (isAllNode) { + // If the query id is not found in current FE, try to get it from other FE String httpPath = "/rest/v2/manager/query/trace_id/" + traceId; ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder().put(IS_ALL_NODE_PARA, "false").build(); @@ -315,33 +343,29 @@ public class QueryProfileAction extends RestBaseController { ImmutableMap<String, String> header = ImmutableMap.<String, String>builder() .put(NodeAction.AUTHORIZATION, request.getHeader(NodeAction.AUTHORIZATION)).build(); for (Pair<String, Integer> ipPort : frontends) { + if (HttpUtils.isCurrentFe(ipPort.first, ipPort.second)) { + // skip current FE. + continue; + } String url = HttpUtils.concatUrl(ipPort, httpPath, arguments); - try { - String responseJson = HttpUtils.doGet(url, header); - int code = JsonParser.parseString(responseJson).getAsJsonObject().get("code").getAsInt(); - if (code == HttpUtils.REQUEST_SUCCESS_CODE) { - return responseJson; + String responseJson = HttpUtils.doGet(url, header); + JsonObject jObj = JsonParser.parseString(responseJson).getAsJsonObject(); + int code = jObj.get("code").getAsInt(); + if (code == HttpUtils.REQUEST_SUCCESS_CODE) { + if (!jObj.has("data") || jObj.get("data").isJsonNull() || Strings.isNullOrEmpty( + jObj.get("data").getAsString())) { + throw new Exception(String.format("trace id %s not found", traceId)); } - } catch (Exception e) { - LOG.warn(e); + return jObj.get("data").getAsString(); } + LOG.warn("get query id by trace id error, resp: {}", responseJson); + // If the response code is not success, it means that the trace id is not found in this FE. + // Continue to try the next FE. } - } else { - ExecuteEnv env = ExecuteEnv.getInstance(); - String queryId = env.getScheduler().getQueryIdByTraceId(traceId); - if (Strings.isNullOrEmpty(queryId)) { - return ResponseEntityBuilder.badRequest("Not found"); - } - - try { - checkAuthByUserAndQueryId(queryId); - } catch (AuthenticationException e) { - return ResponseEntityBuilder.badRequest(e.getMessage()); - } - - return ResponseEntityBuilder.ok(queryId); } - return ResponseEntityBuilder.badRequest("not found query id"); + + // Not found in all FE. + throw new Exception(String.format("trace id %s not found", traceId)); } /** @@ -505,4 +529,69 @@ public class QueryProfileAction extends RestBaseController { env.getScheduler().cancelQuery(queryId, new Status(TStatusCode.CANCELLED, "cancel query by rest api")); return ResponseEntityBuilder.ok(); } + + /** + * Get real-time query statistics for with given query id. + * This API is used for getting the runtime query progress + * + * @param request + * @param response + * @param traceId: The user specified trace id, eg, set session_context="trace_id:123456"; + * @return + */ + @RequestMapping(path = "/statistics/{trace_id}", method = RequestMethod.GET) + public Object queryStatistics(HttpServletRequest request, HttpServletResponse response, + @PathVariable("trace_id") String traceId) { + executeCheckPassword(request, response); + + String queryId = null; + try { + queryId = getQueryIdByTraceIdImpl(request, traceId, true); + } catch (Exception e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + + try { + TQueryStatistics statistic = ProfileManager.getInstance().getQueryStatistic(queryId); + return ResponseEntityBuilder.ok(new QueryStatistics(statistic)); + } catch (Exception e) { + LOG.warn("get query statistics error, queryId:{}", queryId, e); + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + /** + * A class that represents the query runtime statistics. + */ + @Getter + @Setter + public static class QueryStatistics { + public long scanRows; + public long scanBytes; + public long returnedRows; + public long cpuMs; + public long maxPeakMemoryBytes; + public long currentUsedMemoryBytes; + public long shuffleSendBytes; + public long shuffleSendRows; + public long scanBytesFromLocalStorage; + public long scanBytesFromRemoteStorage; + public long spillWriteBytesToLocalStorage; + public long spillReadBytesFromLocalStorage; + + public QueryStatistics(TQueryStatistics queryStatistics) { + this.scanRows = queryStatistics.getScanRows(); + this.scanBytes = queryStatistics.getScanBytes(); + this.returnedRows = queryStatistics.getReturnedRows(); + this.cpuMs = queryStatistics.getCpuMs(); + this.maxPeakMemoryBytes = queryStatistics.getMaxPeakMemoryBytes(); + this.currentUsedMemoryBytes = queryStatistics.getCurrentUsedMemoryBytes(); + this.shuffleSendBytes = queryStatistics.getShuffleSendBytes(); + this.shuffleSendRows = queryStatistics.getShuffleSendRows(); + this.scanBytesFromLocalStorage = queryStatistics.getScanBytesFromLocalStorage(); + this.scanBytesFromRemoteStorage = queryStatistics.getScanBytesFromRemoteStorage(); + this.spillWriteBytesToLocalStorage = queryStatistics.getSpillWriteBytesToLocalStorage(); + this.spillReadBytesFromLocalStorage = queryStatistics.getSpillReadBytesFromLocalStorage(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java index 2fb33c45bf6..34bb18fe5e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowProcessListCommand.java @@ -17,10 +17,9 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.SchemaTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.FrontendsProcNode; @@ -47,21 +46,14 @@ import java.util.Optional; */ public class ShowProcessListCommand extends ShowCommand { private static final Logger LOG = LogManager.getLogger(ShowProcessListCommand.class); - private static final ShowResultSetMetaData PROCESSLIST_META_DATA = ShowResultSetMetaData.builder() - .addColumn(new Column("CurrentConnected", ScalarType.createVarchar(16))) - .addColumn(new Column("Id", ScalarType.createType(PrimitiveType.BIGINT))) - .addColumn(new Column("User", ScalarType.createVarchar(16))) - .addColumn(new Column("Host", ScalarType.createVarchar(16))) - .addColumn(new Column("LoginTime", ScalarType.createVarchar(16))) - .addColumn(new Column("Catalog", ScalarType.createVarchar(16))) - .addColumn(new Column("Db", ScalarType.createVarchar(16))) - .addColumn(new Column("Command", ScalarType.createVarchar(16))) - .addColumn(new Column("Time", ScalarType.createType(PrimitiveType.INT))) - .addColumn(new Column("State", ScalarType.createVarchar(64))) - .addColumn(new Column("QueryId", ScalarType.createVarchar(64))) - .addColumn(new Column("Info", ScalarType.STRING)) - .addColumn(new Column("FE", ScalarType.createVarchar(16))) - .addColumn(new Column("CloudCluster", ScalarType.createVarchar(16))).build(); + private static final ShowResultSetMetaData PROCESSLIST_META_DATA; + + static { + Table tbl = SchemaTable.TABLE_MAP.get("processlist"); + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + tbl.getBaseSchema().stream().forEach(column -> builder.addColumn(column)); + PROCESSLIST_META_DATA = builder.build(); + } private final boolean isFull; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index c980de73b5a..78efb8a2d74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -949,6 +949,10 @@ public class ConnectContext { } public void setTraceId(String traceId) { + // When traceId is set, we need to remove the old traceId from connectScheduler. + if (connectScheduler != null) { + connectScheduler.removeOldTraceId(this.traceId); + } this.traceId = traceId; } @@ -1225,6 +1229,7 @@ public class ConnectContext { row.add("" + (nowMs - startTime) / 1000); row.add(state.toString()); row.add(DebugUtil.printId(queryId)); + row.add(Strings.nullToEmpty(traceId)); if (state.getStateType() == QueryState.MysqlStateType.ERR) { row.add(state.getErrorMessage()); } else if (executor != null) { @@ -1247,7 +1252,6 @@ public class ConnectContext { } } - public void startAcceptQuery(ConnectProcessor connectProcessor) { mysqlChannel.startAcceptQuery(this, connectProcessor); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java index ce91ef09e37..e0f311aec85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java @@ -84,6 +84,9 @@ public class ConnectPoolMgr { if (conns != null) { conns.decrementAndGet(); } + if (ctx.traceId() != null) { + traceId2QueryId.remove(ctx.traceId()); + } numberConnection.decrementAndGet(); } } @@ -155,6 +158,12 @@ public class ConnectPoolMgr { return queryId == null ? "" : DebugUtil.printId(queryId); } + public void removeTraceId(String traceId) { + if (traceId != null) { + traceId2QueryId.remove(traceId); + } + } + public Map<Integer, ConnectContext> getConnectionMap() { return connectionMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index c8d27a23db6..32ea481fa9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -138,6 +138,11 @@ public class ConnectScheduler { return queryId; } + public void removeOldTraceId(String traceId) { + connectPoolMgr.removeTraceId(traceId); + flightSqlConnectPoolMgr.removeTraceId(traceId); + } + public Map<Integer, ConnectContext> getConnectionMap() { Map<Integer, ConnectContext> map = Maps.newConcurrentMap(); map.putAll(connectPoolMgr.getConnectionMap()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java index c9266cf6add..0fbcb5690fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java @@ -403,7 +403,7 @@ public class CoordinatorContext { List<AssignedJob> instanceJobs = pipelinePlan.getInstanceJobs(); for (AssignedJob instanceJob : instanceJobs) { DistributedPlanWorker worker = instanceJob.getAssignedWorker(); - backends.put(new TNetworkAddress(worker.address(), worker.port()), worker.id()); + backends.put(new TNetworkAddress(worker.host(), worker.port()), worker.id()); } } return backends; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index 1c576053113..84bc45b19f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -102,7 +102,7 @@ public class ConnectContextTest { // Thread info Assert.assertNotNull(ctx.toThreadInfo(false)); List<String> row = ctx.toThreadInfo(false).toRow(101, 1000, Optional.empty()); - Assert.assertEquals(14, row.size()); + Assert.assertEquals(15, row.size()); Assert.assertEquals("Yes", row.get(0)); Assert.assertEquals("101", row.get(1)); Assert.assertEquals("testUser", row.get(2)); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 16d1afda43e..6754e5f9f9f 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -344,11 +344,14 @@ struct TPublishTopicResult { struct TGetRealtimeExecStatusRequest { // maybe query id or other unique id 1: optional Types.TUniqueId id + 2: optional string req_type // "stats" or "profile" } struct TGetRealtimeExecStatusResponse { 1: optional Status.TStatus status 2: optional FrontendService.TReportExecStatusParams report_exec_status_params + // query_stats is for getting real-time query statistics of a certain query + 3: optional FrontendService.TQueryStatistics query_stats } struct TDictionaryStatus { diff --git a/regression-test/plugins/plugin_curl_requester.groovy b/regression-test/plugins/plugin_curl_requester.groovy index 05c43bbeebf..6fb76589fff 100644 --- a/regression-test/plugins/plugin_curl_requester.groovy +++ b/regression-test/plugins/plugin_curl_requester.groovy @@ -112,7 +112,7 @@ Suite.metaClass.http_client = { String method, String url /* param */ -> logger.info("Added 'http_client' function to Suite") -Suite.metaClass.curl = { String method, String url, String body = null, Integer timeoutSec = 10 /* param */-> +Suite.metaClass.curl = { String method, String url, String body = null, Integer timeoutSec = 10, String user = "", String pwd = ""-> Suite suite = delegate as Suite if (method != "GET" && method != "POST") { throw new Exception(String.format("invalid curl method: %s", method)) @@ -125,11 +125,15 @@ Suite.metaClass.curl = { String method, String url, String body = null, Integer Integer retryCount = 0; // Current retry count Integer sleepTime = 5000; // Sleep time in milliseconds + String auth = ""; + if (!user.equals("")) { + auth = String.format("-u%s:%s", user, pwd).toString(); + } String cmd if (method == "POST" && body != null) { - cmd = String.format("curl --max-time %d -X %s -H Content-Type:application/json -d %s %s", timeoutSec, method, body, url).toString() + cmd = String.format("curl %s --max-time %d -X %s -H Content-Type:application/json -d %s %s", auth, timeoutSec, method, body, url).toString() } else { - cmd = String.format("curl --max-time %d -X %s %s", timeoutSec, method, url).toString() + cmd = String.format("curl %s --max-time %d -X %s %s", auth, timeoutSec, method, url).toString() } logger.info("curl cmd: " + cmd) diff --git a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy index aa312ac0b0c..37e7c3c4bbe 100644 --- a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy +++ b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy @@ -63,10 +63,10 @@ suite("test_single_compaction_with_variant_inverted", "p2, nonConcurrent") { out = process.getText() logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) if (!disableAutoCompaction) { - return "Success, " + out + return "Success, " + out2 } - assertEquals(code, 0) - return out + assertEquals(code2, 0) + return out2 } def triggerSingleCompaction = { be_host, be_http_port, tablet_id -> @@ -84,10 +84,10 @@ suite("test_single_compaction_with_variant_inverted", "p2, nonConcurrent") { out = process.getText() logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) if (!disableAutoCompaction) { - return "Success, " + out + return "Success, " + out3 } - assertEquals(code, 0) - return out + assertEquals(code3, 0) + return out3 } def waitForCompaction = { be_host, be_http_port, tablet_id -> boolean running = true diff --git a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy index 17b42303f7f..3e3e168c054 100644 --- a/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy +++ b/regression-test/suites/external_table_p0/info_schema_db/test_information_schema_timezone.groovy @@ -88,7 +88,7 @@ suite("test_information_schema_timezone", "p0,external,hive,kerberos,external_do // 4. processlist List<List<Object>> processlist_res_1 = sql """ - select LOGIN_TIME from information_schema.processlist where INFO like "%information_schema.processlist%" + select LOGINTIME from information_schema.processlist where INFO like "%information_schema.processlist%" """ logger.info("processlist_res_1 = " + processlist_res_1); @@ -171,7 +171,7 @@ suite("test_information_schema_timezone", "p0,external,hive,kerberos,external_do // 4. processlist List<List<Object>> processlist_res_2 = sql """ - select LOGIN_TIME from information_schema.processlist where INFO like "%information_schema.processlist%" + select LOGINTIME from information_schema.processlist where INFO like "%information_schema.processlist%" """ logger.info("processlist_res_2 = " + processlist_res_2); assertEquals(true, isEightHoursDiff(processlist_res_1[0][0], processlist_res_2[0][0])) diff --git a/regression-test/suites/manager/test_manager_interface_5.groovy b/regression-test/suites/manager/test_manager_interface_5.groovy new file mode 100644 index 00000000000..5c44ae3d2d4 --- /dev/null +++ b/regression-test/suites/manager/test_manager_interface_5.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +import java.time.LocalDateTime +import java.time.Duration +import java.time.format.DateTimeFormatter + +suite('test_manager_interface_5',"p0") { + // /rest/v2/manager/query/trace_id/{trace_id} + // /rest/v2/manager/query/trace_id/{trace_id} + def test_trace_id = { + def futures = [] + futures.add( thread { + try{ + sql """set session_context="trace_id:test_manager_interface_5_trace_id"""" + sql """ select * from numbers("number" = "9910") as a join numbers('number'="18880094567") as b on a.number = b.number; """ + }catch(Exception e){ + + } + }) + + futures.add( thread { + // test trace id in processlist + sleep(500); + List<List<Object>> result = sql_return_maparray """ show processlist """ + def queryid = "" + def x = 0 + logger.info("result = ${result}") + for( int i =0 ;i < result.size();i++ ){ + if (result[i]["Info"].contains("18880094567")) { + queryid = result[i]["QueryId"] + assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id")) + x = 1 + break; + } + } + assertTrue(x == 1) + + result = sql_return_maparray """ select * from information_schema.processlist """ + def queryid2 = "" + def x2 = 0 + logger.info("result = ${result}") + for( int i =0 ;i < result.size();i++ ){ + if (result[i]["Info"].contains("18880094567")) { + queryid2 = result[i]["QueryId"] + assertTrue(result[i]["TraceId"].equals("test_manager_interface_5_trace_id")) + x2 = 1 + break; + } + } + assertTrue(x2 == 1) + + // test trace id in cancel query and get realtime query statistics + // 1. test get query id by trace id + def fes = sql_return_maparray "show frontends" + logger.info("frontends: ${fes}") + def fe = fes[0] + def url = "http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/trace_id/test_manager_interface_5_trace_id?is_all_node=true" + def (code, out, err) = curl("GET", url, null, 10, context.config.jdbcUser, context.config.jdbcPassword) + println "${out}" + assertTrue(code == 0) + def getQueryId = parseJson(out).get("data"); + assertEquals(queryid2, getQueryId); + + // 2. test get realtime query statistics by trace id + url = "http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/statistics/test_manager_interface_5_trace_id?is_all_node=true" + (code, out, err) = curl("GET", url, null, 10, context.config.jdbcUser, context.config.jdbcPassword) + println "${out}" + assertTrue(code == 0) + def stats = parseJson(out).get("data"); + assertTrue(stats.containsKey("cpuMs")); + + // 3. test cancel query by query id + url = "http://${fe.Host}:${fe.HttpPort}/rest/v2/manager/query/kill/${queryid2}?is_all_node=true" + (code, out, err) = curl("POST", url, null, 10, context.config.jdbcUser, context.config.jdbcPassword) + println "${out}" + assertTrue(code == 0) + }) + futures.each { it.get() } + } + test_trace_id(); +} diff --git a/regression-test/suites/show_p0/test_show_processlist.groovy b/regression-test/suites/show_p0/test_show_processlist.groovy index b093261e7b2..e34db3b2524 100644 --- a/regression-test/suites/show_p0/test_show_processlist.groovy +++ b/regression-test/suites/show_p0/test_show_processlist.groovy @@ -21,20 +21,39 @@ suite("test_show_processlist") { sql """set show_all_fe_connection = false;""" def result = sql """show processlist;""" logger.info("result:${result}") - assertTrue(result[0].size() == 14) + assertTrue(result[0].size() == 15) sql """set show_all_fe_connection = true;""" result = sql """show processlist;""" logger.info("result:${result}") - assertTrue(result[0].size() == 14) + assertTrue(result[0].size() == 15) sql """set show_all_fe_connection = false;""" def url1 = "http://${context.config.feHttpAddress}/rest/v1/session" result = Http.GET(url1, true) logger.info("result:${result}") - assertTrue(result["data"]["column_names"].size() == 14); + assertTrue(result["data"]["column_names"].size() == 15); def url2 = "http://${context.config.feHttpAddress}/rest/v1/session/all" result = Http.GET(url2, true) logger.info("result:${result}") - assertTrue(result["data"]["column_names"].size() == 14); -} \ No newline at end of file + assertTrue(result["data"]["column_names"].size() == 15); + + result = sql """select * from information_schema.processlist""" + logger.info("result:${result}") + assertTrue(result[0].size() == 15) + + + def result1 = connect('root', context.config.jdbcPassword, context.config.jdbcUrl) { + // execute sql with admin user + sql 'select 99 + 1' + sql 'set session_context="trace_id:test_show_processlist_trace_id"' + def result2 = sql """select * from information_schema.processlist""" + def found = false; + for (def row in result2) { + if (row[11].equals("test_show_processlist_trace_id")) { + found = true; + } + } + assertTrue(found) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org