This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b1d63402d6c Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column for active_queries (#32259) b1d63402d6c is described below commit b1d63402d6c179dfaea25c244c48e3f761099fa1 Author: wangbo <wan...@apache.org> AuthorDate: Fri Mar 15 22:06:47 2024 +0800 Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column for active_queries (#32259) --- .../schema_active_queries_scanner.cpp | 8 +- .../java/org/apache/doris/catalog/SchemaTable.java | 5 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 + .../java/org/apache/doris/qe/QeProcessorImpl.java | 29 ++++++- .../org/apache/doris/qe/QueryStatisticsItem.java | 6 +- .../doris/resource/workloadgroup/QueryQueue.java | 6 +- .../doris/resource/workloadgroup/QueueToken.java | 33 +++++++- .../doris/tablefunction/MetadataGenerator.java | 90 +++++++++------------- .../schema_table/test_active_queries.groovy | 6 +- 9 files changed, 123 insertions(+), 64 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index f16326dc8f5..36cb145e3f5 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -29,11 +29,14 @@ namespace doris { std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = { // name, type, size {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true}, - {"START_TIME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"QUERY_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true}, {"QUERY_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true}, {"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), true}, {"DATABASE", TYPE_VARCHAR, sizeof(StringRef), true}, {"FRONTEND_INSTANCE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"QUEUE_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"QUEUE_END_TIME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"QUERY_STATUS", TYPE_VARCHAR, sizeof(StringRef), true}, {"SQL", TYPE_STRING, sizeof(StringRef), true}}; SchemaActiveQueriesScanner::SchemaActiveQueriesScanner() @@ -127,6 +130,9 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get()); insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get()); insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get()); + insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get()); + insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get()); + insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get()); } return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 736df74f1e6..b89b7e1ea6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -460,11 +460,14 @@ public class SchemaTable extends Table { .build())) .put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA, builder().column("QUERY_ID", ScalarType.createVarchar(256)) - .column("START_TIME", ScalarType.createVarchar(256)) + .column("QUERY_START_TIME", ScalarType.createVarchar(256)) .column("QUERY_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) .column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT)) .column("DATABASE", ScalarType.createVarchar(256)) .column("FRONTEND_INSTANCE", ScalarType.createVarchar(256)) + .column("QUEUE_START_TIME", ScalarType.createVarchar(256)) + .column("QUEUE_END_TIME", ScalarType.createVarchar(256)) + .column("QUERY_STATUS", ScalarType.createVarchar(256)) .column("SQL", ScalarType.createStringType()) .build())) .put("workload_groups", new SchemaTable(SystemIdGenerator.getNextId(), "workload_groups", TableType.SCHEMA, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2671228fc3c..7a213e47862 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3906,6 +3906,10 @@ public class Coordinator implements CoordInterface { } } + public QueueToken getQueueToken() { + return queueToken; + } + // fragment instance exec param, it is used to assemble // the per-instance TPlanFragmentExecParams, as a member of // FragmentExecParams diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 55536639bfd..03144fc797c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -24,6 +24,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; @@ -265,7 +266,6 @@ public final class QeProcessorImpl implements QeProcessor { private final ConnectContext connectContext; private final Coordinator coord; private final String sql; - private final long startExecTime; // from Export, Pull load, Insert public QueryInfo(Coordinator coord) { @@ -277,7 +277,6 @@ public final class QeProcessorImpl implements QeProcessor { this.connectContext = connectContext; this.coord = coord; this.sql = sql; - this.startExecTime = System.currentTimeMillis(); } public ConnectContext getConnectContext() { @@ -293,7 +292,31 @@ public final class QeProcessorImpl implements QeProcessor { } public long getStartExecTime() { - return startExecTime; + if (coord.getQueueToken() != null) { + return coord.getQueueToken().getQueueEndTime(); + } + return -1; + } + + public long getQueueStartTime() { + if (coord.getQueueToken() != null) { + return coord.getQueueToken().getQueueStartTime(); + } + return -1; + } + + public long getQueueEndTime() { + if (coord.getQueueToken() != null) { + return coord.getQueueToken().getQueueEndTime(); + } + return -1; + } + + public TokenState getQueueStatus() { + if (coord.getQueueToken() != null) { + return coord.getQueueToken().getTokenState(); + } + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index 79c3e083113..76b528464d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -74,7 +74,11 @@ public final class QueryStatisticsItem { public String getQueryExecTime() { final long currentTime = System.currentTimeMillis(); - return String.valueOf(currentTime - queryStartTime); + if (queryStartTime <= 0) { + return String.valueOf(-1); + } else { + return String.valueOf(currentTime - queryStartTime); + } } public String getQueryId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 7ba6353e746..5953edbf66e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -100,7 +100,6 @@ public class QueryQueue { } public QueueToken getToken() throws UserException { - queueLock.lock(); try { if (LOG.isDebugEnabled()) { @@ -108,13 +107,16 @@ public class QueryQueue { } if (currentRunningQueryNum < maxConcurrency) { currentRunningQueryNum++; - return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success"); + QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success"); + retToken.setQueueTimeWhenOfferSuccess(); + return retToken; } if (priorityTokenQueue.size() >= maxQueueSize) { throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); } QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout, "query wait timeout " + queueTimeout + " ms"); + newQueryToken.setQueueTimeWhenQueueSuccess(); this.priorityTokenQueue.offer(newQueryToken); return newQueryToken; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 189ba77e8de..6bf44c78828 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -36,7 +36,7 @@ public class QueueToken implements Comparable<QueueToken> { return Long.compare(this.tokenId, other.getTokenId()); } - enum TokenState { + public enum TokenState { ENQUEUE_SUCCESS, READY_TO_RUN } @@ -56,6 +56,9 @@ public class QueueToken implements Comparable<QueueToken> { private final ReentrantLock tokenLock = new ReentrantLock(); private final Condition tokenCond = tokenLock.newCondition(); + private long queueStartTime = -1; + private long queueEndTime = -1; + public QueueToken(TokenState tokenState, long queueWaitTimeout, String offerResultDetail) { this.tokenId = tokenIdGenerator.addAndGet(1); @@ -94,6 +97,7 @@ public class QueueToken implements Comparable<QueueToken> { return false; } finally { this.tokenLock.unlock(); + this.setQueueTimeWhenQueueEnd(); } } @@ -126,6 +130,33 @@ public class QueueToken implements Comparable<QueueToken> { return this.tokenState == TokenState.READY_TO_RUN; } + public void setQueueTimeWhenOfferSuccess() { + long currentTime = System.currentTimeMillis(); + this.queueStartTime = currentTime; + this.queueEndTime = currentTime; + } + + public void setQueueTimeWhenQueueSuccess() { + long currentTime = System.currentTimeMillis(); + this.queueStartTime = currentTime; + } + + public void setQueueTimeWhenQueueEnd() { + this.queueEndTime = System.currentTimeMillis(); + } + + public long getQueueStartTime() { + return queueStartTime; + } + + public long getQueueEndTime() { + return queueEndTime; + } + + public TokenState getTokenState() { + return tokenState; + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 8f4dbf95616..adf3a9b3ed2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -42,6 +42,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.FrontendService; @@ -57,7 +58,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPipelineWorkloadGroup; -import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TSchemaTableRequestParams; import org.apache.doris.thrift.TStatus; @@ -91,11 +91,14 @@ public class MetadataGenerator { private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = ImmutableList.of( new Column("QUERY_ID", ScalarType.createStringType()), - new Column("START_TIME", ScalarType.createStringType()), + new Column("QUERY_START_TIME", ScalarType.createStringType()), new Column("QUERY_TIME_MS", PrimitiveType.BIGINT), new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT), new Column("DATABASE", ScalarType.createStringType()), new Column("FRONTEND_INSTANCE", ScalarType.createStringType()), + new Column("QUEUE_START_TIME", ScalarType.createStringType()), + new Column("QUEUE_END_TIME", ScalarType.createStringType()), + new Column("QUERY_STATUS", ScalarType.createStringType()), new Column("SQL", ScalarType.createStringType())); private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX; @@ -490,53 +493,6 @@ public class MetadataGenerator { return result; } - private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId, Backend be, - String selfNode, QueryInfo queryInfo, TQueryStatistics qs) { - TRow trow = new TRow(); - if (be != null) { - trow.addToColumnValue(new TCell().setStringVal(be.getHost())); - trow.addToColumnValue(new TCell().setLongVal(be.getBePort())); - } else { - trow.addToColumnValue(new TCell().setStringVal("invalid host")); - trow.addToColumnValue(new TCell().setLongVal(-1)); - } - trow.addToColumnValue(new TCell().setStringVal(queryId)); - - String strDate = sdf.format(new Date(queryInfo.getStartExecTime())); - trow.addToColumnValue(new TCell().setStringVal(strDate)); - trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); - - if (qs != null) { - trow.addToColumnValue(new TCell().setLongVal(qs.workload_group_id)); - trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms)); - trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows)); - trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes)); - trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes)); - trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes)); - trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_bytes)); - trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_rows)); - } else { - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - trow.addToColumnValue(new TCell().setLongVal(0L)); - } - - if (queryInfo.getConnectContext() != null) { - trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase())); - } else { - trow.addToColumnValue(new TCell().setStringVal("")); - } - trow.addToColumnValue(new TCell().setStringVal(selfNode)); - trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); - - return trow; - } - private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams, TFetchSchemaTableDataRequest parentRequest) { TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); @@ -557,9 +513,15 @@ public class MetadataGenerator { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(queryId)); - String strDate = sdf.format(new Date(queryInfo.getStartExecTime())); - trow.addToColumnValue(new TCell().setStringVal(strDate)); - trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); + long queryStartTime = queryInfo.getStartExecTime(); + if (queryStartTime > 0) { + trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queryStartTime)))); + trow.addToColumnValue( + new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime())); + } else { + trow.addToColumnValue(new TCell()); + trow.addToColumnValue(new TCell().setLongVal(-1)); + } List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups(); if (tgroupList != null && tgroupList.size() == 1) { @@ -574,6 +536,30 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal("")); } trow.addToColumnValue(new TCell().setStringVal(selfNode)); + + long queueStartTime = queryInfo.getQueueStartTime(); + if (queueStartTime > 0) { + trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueStartTime)))); + } else { + trow.addToColumnValue(new TCell()); + } + + long queueEndTime = queryInfo.getQueueEndTime(); + if (queueEndTime > 0) { + trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueEndTime)))); + } else { + trow.addToColumnValue(new TCell()); + } + + TokenState tokenState = queryInfo.getQueueStatus(); + if (tokenState == null) { + trow.addToColumnValue(new TCell()); + } else if (tokenState == TokenState.READY_TO_RUN) { + trow.addToColumnValue(new TCell().setStringVal("RUNNING")); + } else { + trow.addToColumnValue(new TCell().setStringVal("QUEUED")); + } + trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); dataBatch.add(trow); } diff --git a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy index eecba3d063b..64c344deec7 100644 --- a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy +++ b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy @@ -22,19 +22,19 @@ suite("test_active_queries") { sql "set experimental_enable_pipeline_engine=false" sql "set experimental_enable_pipeline_x_engine=false" sql "select * from information_schema.active_queries" - sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries" // pipeline sql "set experimental_enable_pipeline_engine=true" sql "set experimental_enable_pipeline_x_engine=false" sql "select * from information_schema.active_queries" - sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries" // pipelinex sql "set experimental_enable_pipeline_engine=true" sql "set experimental_enable_pipeline_x_engine=true" sql "select * from information_schema.active_queries" - sql "select QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from information_schema.active_queries" + sql "select QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from information_schema.active_queries" Thread.sleep(1000) } }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org