This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1d63402d6c Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column 
for active_queries (#32259)
b1d63402d6c is described below

commit b1d63402d6c179dfaea25c244c48e3f761099fa1
Author: wangbo <wan...@apache.org>
AuthorDate: Fri Mar 15 22:06:47 2024 +0800

    Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column for active_queries 
(#32259)
---
 .../schema_active_queries_scanner.cpp              |  8 +-
 .../java/org/apache/doris/catalog/SchemaTable.java |  5 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 +
 .../java/org/apache/doris/qe/QeProcessorImpl.java  | 29 ++++++-
 .../org/apache/doris/qe/QueryStatisticsItem.java   |  6 +-
 .../doris/resource/workloadgroup/QueryQueue.java   |  6 +-
 .../doris/resource/workloadgroup/QueueToken.java   | 33 +++++++-
 .../doris/tablefunction/MetadataGenerator.java     | 90 +++++++++-------------
 .../schema_table/test_active_queries.groovy        |  6 +-
 9 files changed, 123 insertions(+), 64 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp 
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index f16326dc8f5..36cb145e3f5 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -29,11 +29,14 @@ namespace doris {
 std::vector<SchemaScanner::ColumnDesc> 
SchemaActiveQueriesScanner::_s_tbls_columns = {
         //   name,       type,          size
         {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
-        {"START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"QUERY_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
         {"QUERY_TIME_MS", TYPE_BIGINT, sizeof(int64_t), true},
         {"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), true},
         {"DATABASE", TYPE_VARCHAR, sizeof(StringRef), true},
         {"FRONTEND_INSTANCE", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"QUEUE_START_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"QUEUE_END_TIME", TYPE_VARCHAR, sizeof(StringRef), true},
+        {"QUERY_STATUS", TYPE_VARCHAR, sizeof(StringRef), true},
         {"SQL", TYPE_STRING, sizeof(StringRef), true}};
 
 SchemaActiveQueriesScanner::SchemaActiveQueriesScanner()
@@ -127,6 +130,9 @@ Status 
SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
         insert_string_value(4, row.column_value[4].stringVal, 
_active_query_block.get());
         insert_string_value(5, row.column_value[5].stringVal, 
_active_query_block.get());
         insert_string_value(6, row.column_value[6].stringVal, 
_active_query_block.get());
+        insert_string_value(7, row.column_value[7].stringVal, 
_active_query_block.get());
+        insert_string_value(8, row.column_value[8].stringVal, 
_active_query_block.get());
+        insert_string_value(9, row.column_value[9].stringVal, 
_active_query_block.get());
     }
     return Status::OK();
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 736df74f1e6..b89b7e1ea6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -460,11 +460,14 @@ public class SchemaTable extends Table {
                                     .build()))
             .put("active_queries", new 
SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
                     builder().column("QUERY_ID", ScalarType.createVarchar(256))
-                            .column("START_TIME", 
ScalarType.createVarchar(256))
+                            .column("QUERY_START_TIME", 
ScalarType.createVarchar(256))
                             .column("QUERY_TIME_MS", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("WORKLOAD_GROUP_ID", 
ScalarType.createType(PrimitiveType.BIGINT))
                             .column("DATABASE", ScalarType.createVarchar(256))
                             .column("FRONTEND_INSTANCE", 
ScalarType.createVarchar(256))
+                            .column("QUEUE_START_TIME", 
ScalarType.createVarchar(256))
+                            .column("QUEUE_END_TIME", 
ScalarType.createVarchar(256))
+                            .column("QUERY_STATUS", 
ScalarType.createVarchar(256))
                             .column("SQL", ScalarType.createStringType())
                             .build()))
             .put("workload_groups", new 
SchemaTable(SystemIdGenerator.getNextId(), "workload_groups", TableType.SCHEMA,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2671228fc3c..7a213e47862 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3906,6 +3906,10 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    public QueueToken getQueueToken() {
+        return queueToken;
+    }
+
     // fragment instance exec param, it is used to assemble
     // the per-instance TPlanFragmentExecParams, as a member of
     // FragmentExecParams
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 55536639bfd..03144fc797c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
@@ -265,7 +266,6 @@ public final class QeProcessorImpl implements QeProcessor {
         private final ConnectContext connectContext;
         private final Coordinator coord;
         private final String sql;
-        private final long startExecTime;
 
         // from Export, Pull load, Insert
         public QueryInfo(Coordinator coord) {
@@ -277,7 +277,6 @@ public final class QeProcessorImpl implements QeProcessor {
             this.connectContext = connectContext;
             this.coord = coord;
             this.sql = sql;
-            this.startExecTime = System.currentTimeMillis();
         }
 
         public ConnectContext getConnectContext() {
@@ -293,7 +292,31 @@ public final class QeProcessorImpl implements QeProcessor {
         }
 
         public long getStartExecTime() {
-            return startExecTime;
+            if (coord.getQueueToken() != null) {
+                return coord.getQueueToken().getQueueEndTime();
+            }
+            return -1;
+        }
+
+        public long getQueueStartTime() {
+            if (coord.getQueueToken() != null) {
+                return coord.getQueueToken().getQueueStartTime();
+            }
+            return -1;
+        }
+
+        public long getQueueEndTime() {
+            if (coord.getQueueToken() != null) {
+                return coord.getQueueToken().getQueueEndTime();
+            }
+            return -1;
+        }
+
+        public TokenState getQueueStatus() {
+            if (coord.getQueueToken() != null) {
+                return coord.getQueueToken().getTokenState();
+            }
+            return null;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index 79c3e083113..76b528464d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -74,7 +74,11 @@ public final class QueryStatisticsItem {
 
     public String getQueryExecTime() {
         final long currentTime = System.currentTimeMillis();
-        return String.valueOf(currentTime - queryStartTime);
+        if (queryStartTime <= 0) {
+            return String.valueOf(-1);
+        } else {
+            return String.valueOf(currentTime - queryStartTime);
+        }
     }
 
     public String getQueryId() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 7ba6353e746..5953edbf66e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -100,7 +100,6 @@ public class QueryQueue {
     }
 
     public QueueToken getToken() throws UserException {
-
         queueLock.lock();
         try {
             if (LOG.isDebugEnabled()) {
@@ -108,13 +107,16 @@ public class QueryQueue {
             }
             if (currentRunningQueryNum < maxConcurrency) {
                 currentRunningQueryNum++;
-                return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, 
"offer success");
+                QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, 
queueTimeout, "offer success");
+                retToken.setQueueTimeWhenOfferSuccess();
+                return retToken;
             }
             if (priorityTokenQueue.size() >= maxQueueSize) {
                 throw new UserException("query waiting queue is full, queue 
length=" + maxQueueSize);
             }
             QueueToken newQueryToken = new 
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
                     "query wait timeout " + queueTimeout + " ms");
+            newQueryToken.setQueueTimeWhenQueueSuccess();
             this.priorityTokenQueue.offer(newQueryToken);
             return newQueryToken;
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 189ba77e8de..6bf44c78828 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -36,7 +36,7 @@ public class QueueToken implements Comparable<QueueToken> {
         return Long.compare(this.tokenId, other.getTokenId());
     }
 
-    enum TokenState {
+    public enum TokenState {
         ENQUEUE_SUCCESS,
         READY_TO_RUN
     }
@@ -56,6 +56,9 @@ public class QueueToken implements Comparable<QueueToken> {
     private final ReentrantLock tokenLock = new ReentrantLock();
     private final Condition tokenCond = tokenLock.newCondition();
 
+    private long queueStartTime = -1;
+    private long queueEndTime = -1;
+
     public QueueToken(TokenState tokenState, long queueWaitTimeout,
             String offerResultDetail) {
         this.tokenId = tokenIdGenerator.addAndGet(1);
@@ -94,6 +97,7 @@ public class QueueToken implements Comparable<QueueToken> {
             return false;
         } finally {
             this.tokenLock.unlock();
+            this.setQueueTimeWhenQueueEnd();
         }
     }
 
@@ -126,6 +130,33 @@ public class QueueToken implements Comparable<QueueToken> {
         return this.tokenState == TokenState.READY_TO_RUN;
     }
 
+    public void setQueueTimeWhenOfferSuccess() {
+        long currentTime = System.currentTimeMillis();
+        this.queueStartTime = currentTime;
+        this.queueEndTime = currentTime;
+    }
+
+    public void setQueueTimeWhenQueueSuccess() {
+        long currentTime = System.currentTimeMillis();
+        this.queueStartTime = currentTime;
+    }
+
+    public void setQueueTimeWhenQueueEnd() {
+        this.queueEndTime = System.currentTimeMillis();
+    }
+
+    public long getQueueStartTime() {
+        return queueStartTime;
+    }
+
+    public long getQueueEndTime() {
+        return queueEndTime;
+    }
+
+    public TokenState getTokenState() {
+        return tokenState;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 8f4dbf95616..adf3a9b3ed2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -42,6 +42,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
+import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.FrontendService;
@@ -57,7 +58,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMetadataType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
-import org.apache.doris.thrift.TQueryStatistics;
 import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TSchemaTableRequestParams;
 import org.apache.doris.thrift.TStatus;
@@ -91,11 +91,14 @@ public class MetadataGenerator {
 
     private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = 
ImmutableList.of(
             new Column("QUERY_ID", ScalarType.createStringType()),
-            new Column("START_TIME", ScalarType.createStringType()),
+            new Column("QUERY_START_TIME", ScalarType.createStringType()),
             new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
             new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
             new Column("DATABASE", ScalarType.createStringType()),
             new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
+            new Column("QUEUE_START_TIME", ScalarType.createStringType()),
+            new Column("QUEUE_END_TIME", ScalarType.createStringType()),
+            new Column("QUERY_STATUS", ScalarType.createStringType()),
             new Column("SQL", ScalarType.createStringType()));
 
     private static final ImmutableMap<String, Integer> 
ACTIVE_QUERIES_COLUMN_TO_INDEX;
@@ -490,53 +493,6 @@ public class MetadataGenerator {
         return result;
     }
 
-    private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String 
queryId, Backend be,
-            String selfNode, QueryInfo queryInfo, TQueryStatistics qs) {
-        TRow trow = new TRow();
-        if (be != null) {
-            trow.addToColumnValue(new TCell().setStringVal(be.getHost()));
-            trow.addToColumnValue(new TCell().setLongVal(be.getBePort()));
-        } else {
-            trow.addToColumnValue(new TCell().setStringVal("invalid host"));
-            trow.addToColumnValue(new TCell().setLongVal(-1));
-        }
-        trow.addToColumnValue(new TCell().setStringVal(queryId));
-
-        String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
-        trow.addToColumnValue(new TCell().setStringVal(strDate));
-        trow.addToColumnValue(new 
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
-
-        if (qs != null) {
-            trow.addToColumnValue(new 
TCell().setLongVal(qs.workload_group_id));
-            trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms));
-            trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows));
-            trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
-            trow.addToColumnValue(new 
TCell().setLongVal(qs.max_peak_memory_bytes));
-            trow.addToColumnValue(new 
TCell().setLongVal(qs.current_used_memory_bytes));
-            trow.addToColumnValue(new 
TCell().setLongVal(qs.shuffle_send_bytes));
-            trow.addToColumnValue(new 
TCell().setLongVal(qs.shuffle_send_rows));
-        } else {
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-            trow.addToColumnValue(new TCell().setLongVal(0L));
-        }
-
-        if (queryInfo.getConnectContext() != null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
-        } else {
-            trow.addToColumnValue(new TCell().setStringVal(""));
-        }
-        trow.addToColumnValue(new TCell().setStringVal(selfNode));
-        trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
-
-        return trow;
-    }
-
     private static TFetchSchemaTableDataResult 
queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams,
             TFetchSchemaTableDataRequest parentRequest) {
         TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
@@ -557,9 +513,15 @@ public class MetadataGenerator {
             TRow trow = new TRow();
             trow.addToColumnValue(new TCell().setStringVal(queryId));
 
-            String strDate = sdf.format(new 
Date(queryInfo.getStartExecTime()));
-            trow.addToColumnValue(new TCell().setStringVal(strDate));
-            trow.addToColumnValue(new 
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
+            long queryStartTime = queryInfo.getStartExecTime();
+            if (queryStartTime > 0) {
+                trow.addToColumnValue(new TCell().setStringVal(sdf.format(new 
Date(queryStartTime))));
+                trow.addToColumnValue(
+                        new TCell().setLongVal(System.currentTimeMillis() - 
queryInfo.getStartExecTime()));
+            } else {
+                trow.addToColumnValue(new TCell());
+                trow.addToColumnValue(new TCell().setLongVal(-1));
+            }
 
             List<TPipelineWorkloadGroup> tgroupList = 
queryInfo.getCoord().gettWorkloadGroups();
             if (tgroupList != null && tgroupList.size() == 1) {
@@ -574,6 +536,30 @@ public class MetadataGenerator {
                 trow.addToColumnValue(new TCell().setStringVal(""));
             }
             trow.addToColumnValue(new TCell().setStringVal(selfNode));
+
+            long queueStartTime = queryInfo.getQueueStartTime();
+            if (queueStartTime > 0) {
+                trow.addToColumnValue(new TCell().setStringVal(sdf.format(new 
Date(queueStartTime))));
+            } else {
+                trow.addToColumnValue(new TCell());
+            }
+
+            long queueEndTime = queryInfo.getQueueEndTime();
+            if (queueEndTime > 0) {
+                trow.addToColumnValue(new TCell().setStringVal(sdf.format(new 
Date(queueEndTime))));
+            } else {
+                trow.addToColumnValue(new TCell());
+            }
+
+            TokenState tokenState = queryInfo.getQueueStatus();
+            if (tokenState == null) {
+                trow.addToColumnValue(new TCell());
+            } else if (tokenState == TokenState.READY_TO_RUN) {
+                trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
+            } else {
+                trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
+            }
+
             trow.addToColumnValue(new 
TCell().setStringVal(queryInfo.getSql()));
             dataBatch.add(trow);
         }
diff --git 
a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy 
b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
index eecba3d063b..64c344deec7 100644
--- a/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
+++ b/regression-test/suites/query_p0/schema_table/test_active_queries.groovy
@@ -22,19 +22,19 @@ suite("test_active_queries") {
             sql "set experimental_enable_pipeline_engine=false"
             sql "set experimental_enable_pipeline_x_engine=false"
             sql "select * from information_schema.active_queries"
-            sql "select 
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from 
information_schema.active_queries"
+            sql "select 
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from 
information_schema.active_queries"
 
             // pipeline
             sql "set experimental_enable_pipeline_engine=true"
             sql "set experimental_enable_pipeline_x_engine=false"
             sql "select * from information_schema.active_queries"
-            sql "select 
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from 
information_schema.active_queries"
+            sql "select 
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from 
information_schema.active_queries"
 
             // pipelinex
             sql "set experimental_enable_pipeline_engine=true"
             sql "set experimental_enable_pipeline_x_engine=true"
             sql "select * from information_schema.active_queries"
-            sql "select 
QUERY_ID,START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL from 
information_schema.active_queries"
+            sql "select 
QUERY_ID,QUERY_START_TIME,QUERY_TIME_MS,WORKLOAD_GROUP_ID,SQL,QUERY_STATUS from 
information_schema.active_queries"
             Thread.sleep(1000)
         }
     })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to