This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 45cb1b041ce [Improment]Query queued by be memory (#37559)
45cb1b041ce is described below

commit 45cb1b041ce4671e058a1d09872e0097584f1b29
Author: wangbo <wan...@apache.org>
AuthorDate: Sat Jul 20 15:39:19 2024 +0800

    [Improment]Query queued by be memory (#37559)
    
    ## Proposed changes
    Add a mechanism of back pressure based on memory usage.
    when any BE's mem usage bigger than ```query_queue_by_be_used_memory```,
    query could queue in FE.
---
 be/src/service/backend_service.cpp                 |  10 ++
 be/src/service/backend_service.h                   |   3 +
 .../main/java/org/apache/doris/common/Config.java  |  11 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  20 ++-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   7 +-
 .../apache/doris/resource/AdmissionControl.java    | 156 +++++++++++++++++++++
 .../doris/resource/workloadgroup/QueryQueue.java   |  90 +++++++-----
 .../doris/resource/workloadgroup/QueueToken.java   |  36 +++--
 .../resource/workloadgroup/WorkloadGroup.java      |   6 +-
 .../doris/tablefunction/MetadataGenerator.java     |  11 +-
 .../org/apache/doris/common/GenericPoolTest.java   |   7 +
 .../apache/doris/utframe/MockedBackendFactory.java |   7 +
 gensrc/thrift/BackendService.thrift                |  14 ++
 .../workload_manager_p0/test_curd_wlg.groovy       |   2 +-
 15 files changed, 325 insertions(+), 64 deletions(-)

diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 1d85a2bca69..4effc225110 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1297,4 +1297,14 @@ void 
BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
     response.__set_report_exec_status_params(*report_exec_status_params);
 }
 
+void BaseBackendService::get_be_resource(TGetBeResourceResult& result,
+                                         const TGetBeResourceRequest& request) 
{
+    int64_t mem_usage = PerfCounters::get_vm_rss();
+    int64_t mem_limit = MemInfo::mem_limit();
+    TGlobalResourceUsage global_resource_usage;
+    global_resource_usage.__set_mem_limit(mem_limit);
+    global_resource_usage.__set_mem_usage(mem_usage);
+    result.__set_global_resource_usage(global_resource_usage);
+}
+
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 4a04f16e853..0ada1bf5393 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -138,6 +138,9 @@ public:
     void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
                                   const TGetRealtimeExecStatusRequest& 
request) override;
 
+    void get_be_resource(TGetBeResourceResult& result,
+                         const TGetBeResourceRequest& request) override;
+
     
////////////////////////////////////////////////////////////////////////////
     // begin cloud backend functions
     
////////////////////////////////////////////////////////////////////////////
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 39e8e4f1c17..7af0da4f9d6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1767,6 +1767,17 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
     public static boolean enable_cpu_hard_limit = false;
 
+    @ConfField(mutable = true, description = {
+            "当BE内存用量大于该值时,查询会进入排队逻辑,默认值为-1,代表该值不生效。取值范围0~1的小数",
+            "When be memory usage bigger than this value, query could queue, "
+                    + "default value is -1, means this value not work. Decimal 
value range from 0 to 1"})
+    public static double query_queue_by_be_used_memory = -1;
+
+    @ConfField(mutable = true, description = {"基于内存反压场景FE定时拉取BE内存用量的时间间隔",
+            "In the scenario of memory backpressure, "
+                    + "the time interval for obtaining BE memory usage at 
regular intervals"})
+    public static long get_be_resource_usage_interval_ms = 10000;
+
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index c999024d359..49157b19ffb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.QueryCancelWorker;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
@@ -519,6 +520,8 @@ public class Env {
 
     private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
 
+    private AdmissionControl admissionControl;
+
     private QueryStats queryStats;
 
     private StatisticsCleaner statisticsCleaner;
@@ -784,6 +787,7 @@ public class Env {
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
         this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
+        this.admissionControl = new AdmissionControl(systemInfo);
         this.queryStats = new QueryStats();
         this.loadManagerAdapter = new LoadManagerAdapter();
         this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -899,6 +903,10 @@ public class Env {
         return workloadRuntimeStatusMgr;
     }
 
+    public AdmissionControl getAdmissionControl() {
+        return admissionControl;
+    }
+
     public ExternalMetaIdMgr getExternalMetaIdMgr() {
         return externalMetaIdMgr;
     }
@@ -1819,6 +1827,7 @@ public class Env {
         workloadGroupMgr.start();
         workloadSchedPolicyMgr.start();
         workloadRuntimeStatusMgr.start();
+        admissionControl.start();
         splitSourceManager.start();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 19915a070f1..17ea6ceb983 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -64,6 +64,7 @@ import org.apache.doris.planner.ResultSink;
 import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SchemaScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.InternalService;
@@ -618,6 +619,22 @@ public class Coordinator implements CoordInterface {
         return fragmentParams;
     }
 
+    private boolean shouldQueue() {
+        boolean ret = Config.enable_query_queue && 
!context.getSessionVariable()
+                .getBypassWorkloadGroup() && !isQueryCancelled();
+        if (!ret) {
+            return false;
+        }
+        // a query with ScanNode need not queue only when all its scan node is 
SchemaScanNode
+        for (ScanNode scanNode : this.scanNodes) {
+            boolean isSchemaScanNode = scanNode instanceof SchemaScanNode;
+            if (!isSchemaScanNode) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     // Initiate asynchronous execution of query. Returns as soon as all plan 
fragments
     // have started executing at their respective backends.
     // 'Request' must contain at least a coordinator plan fragment (ie, can't
@@ -629,8 +646,7 @@ public class Coordinator implements CoordInterface {
         if (context != null) {
             if (Config.enable_workload_group) {
                 
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
-                boolean shouldQueue = Config.enable_query_queue && 
!context.getSessionVariable()
-                        .getBypassWorkloadGroup() && !isQueryCancelled();
+                boolean shouldQueue = this.shouldQueue();
                 if (shouldQueue) {
                     queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
                     if (queryQueue == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 38b0ac192d2..4f9f51a951e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -26,7 +26,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryProfile;
@@ -374,11 +373,11 @@ public final class QeProcessorImpl implements QeProcessor 
{
             return -1;
         }
 
-        public TokenState getQueueStatus() {
+        public String getQueueStatus() {
             if (coord.getQueueToken() != null) {
-                return coord.getQueueToken().getTokenState();
+                return coord.getQueueToken().getQueueMsg();
             }
-            return null;
+            return "";
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
new file mode 100644
index 00000000000..480afcde5b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
+import org.apache.doris.thrift.TGlobalResourceUsage;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class AdmissionControl extends MasterDaemon {
+
+    public static final Logger LOG = 
LogManager.getLogger(AdmissionControl.class);
+
+    private volatile boolean isAllBeMemoryEnough = true;
+
+    private double currentMemoryLimit = 0;
+
+    private SystemInfoService clusterInfoService;
+
+    public AdmissionControl(SystemInfoService clusterInfoService) {
+        super("get-be-resource-usage-thread", 
Config.get_be_resource_usage_interval_ms);
+        this.clusterInfoService = clusterInfoService;
+    }
+
+    private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new 
ConcurrentLinkedQueue();
+
+    public void addQueueToken(QueueToken queryQueue) {
+        queryWaitQueue.offer(queryQueue);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        getBeMemoryUsage();
+        notifyWaitQuery();
+    }
+
+    public void getBeMemoryUsage() {
+        if (Config.query_queue_by_be_used_memory < 0) {
+            this.isAllBeMemoryEnough = true;
+            return;
+        }
+        Collection<Backend> backends = 
clusterInfoService.getIdToBackend().values();
+        this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
+        boolean tmpIsAllBeMemoryEnough = true;
+        for (Backend be : backends) {
+            if (!be.isAlive()) {
+                continue;
+            }
+            TNetworkAddress address = null;
+            BackendService.Client client = null;
+            TGetBeResourceResult result = null;
+            boolean rpcOk = true;
+            try {
+                address = new TNetworkAddress(be.getHost(), be.getBePort());
+                client = ClientPool.backendPool.borrowObject(address, 5000);
+                result = client.getBeResource(new TGetBeResourceRequest());
+            } catch (Throwable t) {
+                rpcOk = false;
+                LOG.warn("get be {} resource failed, ", be.getHost(), t);
+            } finally {
+                try {
+                    if (rpcOk) {
+                        ClientPool.backendPool.returnObject(address, client);
+                    } else {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    }
+                } catch (Throwable e) {
+                    LOG.warn("return rpc client failed. related backend[{}]", 
be.getHost(),
+                            e);
+                }
+            }
+            if (result != null && result.isSetGlobalResourceUsage()) {
+                TGlobalResourceUsage globalResourceUsage = 
result.getGlobalResourceUsage();
+                if (globalResourceUsage != null && 
globalResourceUsage.isSetMemLimit()
+                        && globalResourceUsage.isSetMemUsage()) {
+                    long memUsageL = globalResourceUsage.getMemUsage();
+                    long memLimitL = globalResourceUsage.getMemLimit();
+                    double memUsage = 
Double.valueOf(String.valueOf(memUsageL));
+                    double memLimit = 
Double.valueOf(String.valueOf(memLimitL));
+                    double memUsagePercent = memUsage / memLimit;
+
+                    if (memUsagePercent > this.currentMemoryLimit) {
+                        tmpIsAllBeMemoryEnough = false;
+                    }
+                    LOG.debug(
+                            "be ip:{}, mem limit:{}, mem usage:{}, mem usage 
percent:{}, "
+                                    + "query queue mem:{}, query wait size:{}",
+                            be.getHost(), memLimitL, memUsageL, 
memUsagePercent, this.currentMemoryLimit,
+                            this.queryWaitQueue.size());
+                }
+            }
+        }
+        this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
+    }
+
+    public void notifyWaitQuery() {
+        if (!isAllBeMemoryEnough()) {
+            return;
+        }
+        int waitQueryCountSnapshot = queryWaitQueue.size();
+        Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator();
+        while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) {
+            QueueToken queueToken = queueTokenIterator.next();
+            queueToken.notifyWaitQuery();
+            waitQueryCountSnapshot--;
+        }
+    }
+
+    public void removeQueueToken(QueueToken queueToken) {
+        queryWaitQueue.remove(queueToken);
+    }
+
+    public boolean isAllBeMemoryEnough() {
+        return isAllBeMemoryEnough;
+    }
+
+    //TODO(wb): add more resource type
+    public boolean checkResourceAvailable(QueueToken queueToken) {
+        if (isAllBeMemoryEnough()) {
+            return true;
+        } else {
+            queueToken.setQueueMsg("WAIT_BE_MEMORY");
+            return false;
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 07d5939cc4f..ba2d2526f2e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -17,14 +17,18 @@
 
 package org.apache.doris.resource.workloadgroup;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 
-import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.LinkedList;
 import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.locks.ReentrantLock;
 
 // note(wb) refer java BlockingQueue, but support altering capacity
@@ -38,8 +42,6 @@ public class QueryQueue {
     private int maxConcurrency;
     private int maxQueueSize;
     private int queueTimeout; // ms
-    // running property
-    private volatile int currentRunningQueryNum;
 
     public static final String RUNNING_QUERY_NUM = "running_query_num";
     public static final String WAITING_QUERY_NUM = "waiting_query_num";
@@ -48,16 +50,13 @@ public class QueryQueue {
 
     private long propVersion;
 
-    private PriorityQueue<QueueToken> priorityTokenQueue;
+    private PriorityQueue<QueueToken> waitingQueryQueue;
+    private Queue<QueueToken> runningQueryQueue;
 
-    int getCurrentRunningQueryNum() {
-        return currentRunningQueryNum;
-    }
-
-    int getCurrentWaitingQueryNum() {
+    Pair<Integer, Integer> getQueryQueueDetail() {
         try {
             queueLock.lock();
-            return priorityTokenQueue.size();
+            return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size());
         } finally {
             queueLock.unlock();
         }
@@ -89,35 +88,47 @@ public class QueryQueue {
         this.maxQueueSize = maxQueueSize;
         this.queueTimeout = queueTimeout;
         this.propVersion = propVersion;
-        this.priorityTokenQueue = new PriorityQueue<QueueToken>();
+        this.waitingQueryQueue = new PriorityQueue<QueueToken>();
+        this.runningQueryQueue = new LinkedList<QueueToken>();
     }
 
     public String debugString() {
         return "wgId= " + wgId + ", version=" + this.propVersion + 
",maxConcurrency=" + maxConcurrency
-                + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + 
queueTimeout
-                + ", currentRunningQueryNum=" + currentRunningQueryNum
-                + ", currentWaitingQueryNum=" + priorityTokenQueue.size();
+                + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + 
queueTimeout + ", currentRunningQueryNum="
+                + runningQueryQueue.size() + ", currentWaitingQueryNum=" + 
waitingQueryQueue.size();
     }
 
     public QueueToken getToken() throws UserException {
+        AdmissionControl admissionControl = 
Env.getCurrentEnv().getAdmissionControl();
         queueLock.lock();
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
             }
-            if (currentRunningQueryNum < maxConcurrency) {
-                QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, 
queueTimeout, this);
-                retToken.complete();
-                currentRunningQueryNum++;
-                return retToken;
-            }
-            if (priorityTokenQueue.size() >= maxQueueSize) {
+            QueueToken queueToken = new QueueToken(queueTimeout, this);
+
+            boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
+            boolean isResourceAvailable = 
admissionControl.checkResourceAvailable(queueToken);
+            if (!isReachMaxCon && isResourceAvailable) {
+                runningQueryQueue.offer(queueToken);
+                queueToken.complete();
+                return queueToken;
+            } else if (waitingQueryQueue.size() >= maxQueueSize) {
                 throw new UserException("query waiting queue is full, queue 
length=" + maxQueueSize);
+            } else {
+                if (isReachMaxCon) {
+                    queueToken.setQueueMsg("WAIT_IN_QUEUE");
+                }
+                queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS);
+                this.waitingQueryQueue.offer(queueToken);
+                // if a query is added to wg's queue but not in 
AdmissionControl's
+                // queue may be blocked by be memory later,
+                // then we should put query to AdmissionControl in 
releaseAndNotify, it's too complicated.
+                // To simplify the code logic, put all waiting query to 
AdmissionControl,
+                // waiting query can be notified when query finish or memory 
is enough.
+                admissionControl.addQueueToken(queueToken);
             }
-            QueueToken newQueryToken = new 
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
-                    this);
-            this.priorityTokenQueue.offer(newQueryToken);
-            return newQueryToken;
+            return queueToken;
         } finally {
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
@@ -126,23 +137,30 @@ public class QueryQueue {
         }
     }
 
+    public void notifyWaitQuery() {
+        releaseAndNotify(null);
+    }
+
     public void releaseAndNotify(QueueToken releaseToken) {
+        AdmissionControl admissionControl = 
Env.getCurrentEnv().getAdmissionControl();
         queueLock.lock();
         try {
-            // NOTE:token's tokenState need to be locked by queueLock
-            if (releaseToken.isReadyToRun()) {
-                currentRunningQueryNum--;
-            } else {
-                priorityTokenQueue.remove(releaseToken);
-            }
-            Preconditions.checkArgument(currentRunningQueryNum >= 0);
-            while (currentRunningQueryNum < maxConcurrency) {
-                QueueToken queueToken = this.priorityTokenQueue.poll();
+            runningQueryQueue.remove(releaseToken);
+            waitingQueryQueue.remove(releaseToken);
+            admissionControl.removeQueueToken(releaseToken);
+            while (runningQueryQueue.size() < maxConcurrency) {
+                QueueToken queueToken = waitingQueryQueue.peek();
                 if (queueToken == null) {
                     break;
                 }
-                queueToken.complete();
-                currentRunningQueryNum++;
+                if (admissionControl.checkResourceAvailable(queueToken)) {
+                    queueToken.complete();
+                    runningQueryQueue.offer(queueToken);
+                    waitingQueryQueue.remove();
+                    admissionControl.removeQueueToken(queueToken);
+                } else {
+                    break;
+                }
             }
         } finally {
             queueLock.unlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 20a46a526f5..748c0c21bda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -48,31 +48,46 @@ public class QueueToken implements Comparable<QueueToken> {
 
     private long tokenId = 0;
 
-    private TokenState tokenState;
+    private volatile TokenState tokenState;
 
     private long queueWaitTimeout = 0;
 
     private long queueStartTime = -1;
     private long queueEndTime = -1;
 
+    private volatile String queueMsg = "";
+
+    QueryQueue queryQueue = null;
+
     // Object is just a placeholder, it's meaningless now
     private CompletableFuture<Object> future;
-    private QueryQueue queue;
 
-    public QueueToken(TokenState tokenState, long queueWaitTimeout, QueryQueue 
queryQueue) {
+    public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) {
         this.tokenId = tokenIdGenerator.addAndGet(1);
-        this.tokenState = tokenState;
         this.queueWaitTimeout = queueWaitTimeout;
-        this.queue = queryQueue;
         this.queueStartTime = System.currentTimeMillis();
+        this.queryQueue = queryQueue;
         this.future = new CompletableFuture<>();
     }
 
+    public void setQueueMsg(String msg) {
+        this.queueMsg = msg;
+    }
+
+    public void setTokenState(TokenState tokenState) {
+        this.tokenState = tokenState;
+    }
+
+    public String getQueueMsg() {
+        return queueMsg;
+    }
+
     public void get(String queryId, int queryTimeout) throws UserException {
         if (isReadyToRun()) {
             return;
         }
-        long waitTimeout = Math.min(queueWaitTimeout, queryTimeout);
+        long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, 
queryTimeout) : queryTimeout;
+        waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout;
         try {
             future.get(waitTimeout, TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
@@ -89,9 +104,14 @@ public class QueueToken implements Comparable<QueueToken> {
     public void complete() {
         this.queueEndTime = System.currentTimeMillis();
         this.tokenState = TokenState.READY_TO_RUN;
+        this.setQueueMsg("RUNNING");
         future.complete(null);
     }
 
+    public void notifyWaitQuery() {
+        this.queryQueue.notifyWaitQuery();
+    }
+
     public void cancel() {
         future.cancel(true);
     }
@@ -104,10 +124,6 @@ public class QueueToken implements Comparable<QueueToken> {
         return queueEndTime;
     }
 
-    public TokenState getTokenState() {
-        return tokenState;
-    }
-
     public boolean isReadyToRun() {
         return tokenState == TokenState.READY_TO_RUN;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index e5ec2c619b6..d96905af233 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -20,6 +20,7 @@ package org.apache.doris.resource.workloadgroup;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
@@ -420,6 +421,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         List<String> row = new ArrayList<>();
         row.add(String.valueOf(id));
         row.add(name);
+        Pair<Integer, Integer> queryQueueDetail = qq != null ? 
qq.getQueryQueueDetail() : null;
         // skip id,name,running query,waiting query
         for (int i = 2; i < 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
             String key = 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
@@ -461,9 +463,9 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                     row.add(val + "%");
                 }
             } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
-                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentRunningQueryNum()));
+                row.add(queryQueueDetail == null ? "0" : 
String.valueOf(queryQueueDetail.first));
             } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
-                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentWaitingQueryNum()));
+                row.add(queryQueueDetail == null ? "0" : 
String.valueOf(queryQueueDetail.second));
             } else if (TAG.equals(key)) {
                 String val = properties.get(key);
                 if (StringUtils.isEmpty(val)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 91690e23b95..3da993f5190 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -55,7 +55,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -593,14 +592,8 @@ public class MetadataGenerator {
                 trow.addToColumnValue(new TCell());
             }
 
-            TokenState tokenState = queryInfo.getQueueStatus();
-            if (tokenState == null) {
-                trow.addToColumnValue(new TCell());
-            } else if (tokenState == TokenState.READY_TO_RUN) {
-                trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
-            } else {
-                trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
-            }
+            String queueMsg = queryInfo.getQueueStatus();
+            trow.addToColumnValue(new TCell().setStringVal(queueMsg));
 
             trow.addToColumnValue(new 
TCell().setStringVal(queryInfo.getSql()));
             dataBatch.add(trow);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 31fffe6a332..df6b5d440c8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -31,6 +31,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
 import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
 import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
@@ -160,6 +162,11 @@ public class GenericPoolTest {
             return null;
         }
 
+        @Override
+        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
+            return null;
+        }
+
         @Override
         public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) 
throws TException {
             // TODO Auto-generated method stub
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 3934e140f67..f82ca02bbc3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -49,6 +49,8 @@ import org.apache.doris.thrift.TExportState;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
 import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
 import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
@@ -366,6 +368,11 @@ public class MockedBackendFactory {
             return new TPublishTopicResult(new TStatus(TStatusCode.OK));
         }
 
+        @Override
+        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
+            return null;
+        }
+
         @Override
         public TStatus submitExportTask(TExportTaskRequest request) throws 
TException {
             return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 26cf411f739..39b49ca5462 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -333,6 +333,18 @@ struct TGetRealtimeExecStatusResponse {
     2: optional FrontendService.TReportExecStatusParams 
report_exec_status_params
 }
 
+struct TGetBeResourceRequest {
+}
+
+struct TGlobalResourceUsage {
+    1: optional i64 mem_limit
+    2: optional i64 mem_usage
+}
+
+struct TGetBeResourceResult {
+    1: optional TGlobalResourceUsage global_resource_usage
+}
+
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
     // Returns as soon as all incoming data streams have been set up.
@@ -401,4 +413,6 @@ service BackendService {
     TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
 
     TGetRealtimeExecStatusResponse 
get_realtime_exec_status(1:TGetRealtimeExecStatusRequest request);
+
+    TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request);
 }
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 091bbf44c82..f84e913ec35 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -536,7 +536,7 @@ suite("test_crud_wlg") {
     sql "create workload group if not exists bypass_group properties (  
'max_concurrency'='0','max_queue_size'='0','queue_timeout'='0');"
     sql "set workload_group=bypass_group;"
     test {
-        sql "select count(1) from information_schema.active_queries;"
+        sql "select count(1) from ${table_name};"
         exception "query waiting queue is full"
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to