This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1c566253a81 [Pick][Improment]Query queued by be memory (#37559) 
(#39733)
1c566253a81 is described below

commit 1c566253a819ffa6b91eb8fb2925b792520bff26
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Aug 22 15:14:47 2024 +0800

    [Pick][Improment]Query queued by be memory (#37559) (#39733)
    
    pick #37559
---
 be/src/service/backend_service.cpp                 |  11 ++
 be/src/service/backend_service.h                   |   3 +
 .../main/java/org/apache/doris/common/Config.java  |  11 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   9 ++
 .../main/java/org/apache/doris/qe/Coordinator.java |  43 ++++--
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   7 +-
 .../apache/doris/resource/AdmissionControl.java    | 156 +++++++++++++++++++++
 .../doris/resource/workloadgroup/QueryQueue.java   | 101 +++++++------
 .../doris/resource/workloadgroup/QueueToken.java   | 152 +++++++-------------
 .../resource/workloadgroup/WorkloadGroup.java      |   6 +-
 .../doris/tablefunction/MetadataGenerator.java     |  11 +-
 .../org/apache/doris/common/GenericPoolTest.java   |   7 +
 .../apache/doris/utframe/MockedBackendFactory.java |   7 +
 gensrc/thrift/BackendService.thrift                |  15 ++
 .../workload_manager_p0/test_curd_wlg.groovy       |   4 +-
 15 files changed, 367 insertions(+), 176 deletions(-)

diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index e26264b1a22..c30b936769a 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1110,4 +1110,15 @@ void 
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
         break;
     }
 }
+
+void BackendService::get_be_resource(TGetBeResourceResult& result,
+                                     const TGetBeResourceRequest& request) {
+    int64_t mem_usage = PerfCounters::get_vm_rss();
+    int64_t mem_limit = MemInfo::mem_limit();
+    TGlobalResourceUsage global_resource_usage;
+    global_resource_usage.__set_mem_limit(mem_limit);
+    global_resource_usage.__set_mem_usage(mem_usage);
+    result.__set_global_resource_usage(global_resource_usage);
+}
+
 } // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 9d53ec4bc45..bbcf103167f 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -140,6 +140,9 @@ public:
     void query_ingest_binlog(TQueryIngestBinlogResult& result,
                              const TQueryIngestBinlogRequest& request) 
override;
 
+    void get_be_resource(TGetBeResourceResult& result,
+                         const TGetBeResourceRequest& request) override;
+
 private:
     Status start_plan_fragment_execution(const TExecPlanFragmentParams& 
exec_params);
     ExecEnv* _exec_env = nullptr;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6a0394a0be7..5e0cb99b638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1782,6 +1782,17 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
     public static boolean enable_cpu_hard_limit = false;
 
+    @ConfField(mutable = true, description = {
+            "当BE内存用量大于该值时,查询会进入排队逻辑,默认值为-1,代表该值不生效。取值范围0~1的小数",
+            "When be memory usage bigger than this value, query could queue, "
+                    + "default value is -1, means this value not work. Decimal 
value range from 0 to 1"})
+    public static double query_queue_by_be_used_memory = -1;
+
+    @ConfField(mutable = true, description = {"基于内存反压场景FE定时拉取BE内存用量的时间间隔",
+            "In the scenario of memory backpressure, "
+                    + "the time interval for obtaining BE memory usage at 
regular intervals"})
+    public static long get_be_resource_usage_interval_ms = 10000;
+
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 785b7add3a0..d6af35c5bf1 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.QueryCancelWorker;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
@@ -515,6 +516,8 @@ public class Env {
 
     private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
 
+    private AdmissionControl admissionControl;
+
     private QueryStats queryStats;
 
     private StatisticsCleaner statisticsCleaner;
@@ -772,6 +775,7 @@ public class Env {
         this.workloadGroupMgr = new WorkloadGroupMgr();
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
         this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
+        this.admissionControl = new AdmissionControl(systemInfo);
         this.queryStats = new QueryStats();
         this.loadManagerAdapter = new LoadManagerAdapter();
         this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -883,6 +887,10 @@ public class Env {
         return workloadRuntimeStatusMgr;
     }
 
+    public AdmissionControl getAdmissionControl() {
+        return admissionControl;
+    }
+
     public ExternalMetaIdMgr getExternalMetaIdMgr() {
         return externalMetaIdMgr;
     }
@@ -1747,6 +1755,7 @@ public class Env {
         workloadGroupMgr.start();
         workloadSchedPolicyMgr.start();
         workloadRuntimeStatusMgr.start();
+        admissionControl.start();
         splitSourceManager.start();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 19ca1050469..d676ec5b908 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -62,6 +62,7 @@ import org.apache.doris.planner.ResultSink;
 import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SchemaScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.SortNode;
 import org.apache.doris.planner.UnionNode;
@@ -645,6 +646,22 @@ public class Coordinator implements CoordInterface {
         return fragmentParams;
     }
 
+    private boolean shouldQueue() {
+        boolean ret = Config.enable_query_queue && 
!context.getSessionVariable()
+                .getBypassWorkloadGroup() && !isQueryCancelled();
+        if (!ret) {
+            return false;
+        }
+        // a query with ScanNode need not queue only when all its scan node is 
SchemaScanNode
+        for (ScanNode scanNode : this.scanNodes) {
+            boolean isSchemaScanNode = scanNode instanceof SchemaScanNode;
+            if (!isSchemaScanNode) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     // Initiate asynchronous execution of query. Returns as soon as all plan 
fragments
     // have started executing at their respective backends.
     // 'Request' must contain at least a coordinator plan fragment (ie, can't
@@ -656,8 +673,7 @@ public class Coordinator implements CoordInterface {
         if (context != null) {
             if (Config.enable_workload_group) {
                 
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
-                boolean shouldQueue = Config.enable_query_queue && 
!context.getSessionVariable()
-                        .getBypassWorkloadGroup() && !isQueryCancelled();
+                boolean shouldQueue = this.shouldQueue();
                 if (shouldQueue) {
                     queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
                     if (queryQueue == null) {
@@ -666,11 +682,8 @@ public class Coordinator implements CoordInterface {
                         throw new UserException("could not find query queue");
                     }
                     queueToken = queryQueue.getToken();
-                    if 
(!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
-                        LOG.error("query (id=" + DebugUtil.printId(queryId) + 
") " + queueToken.getOfferResultDetail());
-                        queryQueue.returnToken(queueToken);
-                        throw new 
UserException(queueToken.getOfferResultDetail());
-                    }
+                    queueToken.get(DebugUtil.printId(queryId),
+                            this.queryOptions.getExecutionTimeout() * 1000);
                 }
             } else {
                 context.setWorkloadGroupName("");
@@ -681,16 +694,22 @@ public class Coordinator implements CoordInterface {
 
     @Override
     public void close() {
-        for (ScanNode scanNode : scanNodes) {
-            scanNode.stop();
-        }
+        // NOTE: all close method should be no exception
         if (queryQueue != null && queueToken != null) {
             try {
-                queryQueue.returnToken(queueToken);
+                queryQueue.releaseAndNotify(queueToken);
             } catch (Throwable t) {
                 LOG.error("error happens when coordinator close ", t);
             }
         }
+
+        try {
+            for (ScanNode scanNode : scanNodes) {
+                scanNode.stop();
+            }
+        } catch (Throwable t) {
+            LOG.error("error happens when scannode stop ", t);
+        }
     }
 
     private void execInternal() throws Exception {
@@ -1516,7 +1535,7 @@ public class Coordinator implements CoordInterface {
     public void cancel() {
         cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
         if (queueToken != null) {
-            queueToken.signalForCancel();
+            queueToken.cancel();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 26bb3d95db3..1ec23257749 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -25,7 +25,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
@@ -333,11 +332,11 @@ public final class QeProcessorImpl implements QeProcessor 
{
             return -1;
         }
 
-        public TokenState getQueueStatus() {
+        public String getQueueStatus() {
             if (coord.getQueueToken() != null) {
-                return coord.getQueueToken().getTokenState();
+                return coord.getQueueToken().getQueueMsg();
             }
-            return null;
+            return "";
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
new file mode 100644
index 00000000000..480afcde5b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
+import org.apache.doris.thrift.TGlobalResourceUsage;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class AdmissionControl extends MasterDaemon {
+
+    public static final Logger LOG = 
LogManager.getLogger(AdmissionControl.class);
+
+    private volatile boolean isAllBeMemoryEnough = true;
+
+    private double currentMemoryLimit = 0;
+
+    private SystemInfoService clusterInfoService;
+
+    public AdmissionControl(SystemInfoService clusterInfoService) {
+        super("get-be-resource-usage-thread", 
Config.get_be_resource_usage_interval_ms);
+        this.clusterInfoService = clusterInfoService;
+    }
+
+    private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new 
ConcurrentLinkedQueue();
+
+    public void addQueueToken(QueueToken queryQueue) {
+        queryWaitQueue.offer(queryQueue);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        getBeMemoryUsage();
+        notifyWaitQuery();
+    }
+
+    public void getBeMemoryUsage() {
+        if (Config.query_queue_by_be_used_memory < 0) {
+            this.isAllBeMemoryEnough = true;
+            return;
+        }
+        Collection<Backend> backends = 
clusterInfoService.getIdToBackend().values();
+        this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
+        boolean tmpIsAllBeMemoryEnough = true;
+        for (Backend be : backends) {
+            if (!be.isAlive()) {
+                continue;
+            }
+            TNetworkAddress address = null;
+            BackendService.Client client = null;
+            TGetBeResourceResult result = null;
+            boolean rpcOk = true;
+            try {
+                address = new TNetworkAddress(be.getHost(), be.getBePort());
+                client = ClientPool.backendPool.borrowObject(address, 5000);
+                result = client.getBeResource(new TGetBeResourceRequest());
+            } catch (Throwable t) {
+                rpcOk = false;
+                LOG.warn("get be {} resource failed, ", be.getHost(), t);
+            } finally {
+                try {
+                    if (rpcOk) {
+                        ClientPool.backendPool.returnObject(address, client);
+                    } else {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    }
+                } catch (Throwable e) {
+                    LOG.warn("return rpc client failed. related backend[{}]", 
be.getHost(),
+                            e);
+                }
+            }
+            if (result != null && result.isSetGlobalResourceUsage()) {
+                TGlobalResourceUsage globalResourceUsage = 
result.getGlobalResourceUsage();
+                if (globalResourceUsage != null && 
globalResourceUsage.isSetMemLimit()
+                        && globalResourceUsage.isSetMemUsage()) {
+                    long memUsageL = globalResourceUsage.getMemUsage();
+                    long memLimitL = globalResourceUsage.getMemLimit();
+                    double memUsage = 
Double.valueOf(String.valueOf(memUsageL));
+                    double memLimit = 
Double.valueOf(String.valueOf(memLimitL));
+                    double memUsagePercent = memUsage / memLimit;
+
+                    if (memUsagePercent > this.currentMemoryLimit) {
+                        tmpIsAllBeMemoryEnough = false;
+                    }
+                    LOG.debug(
+                            "be ip:{}, mem limit:{}, mem usage:{}, mem usage 
percent:{}, "
+                                    + "query queue mem:{}, query wait size:{}",
+                            be.getHost(), memLimitL, memUsageL, 
memUsagePercent, this.currentMemoryLimit,
+                            this.queryWaitQueue.size());
+                }
+            }
+        }
+        this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
+    }
+
+    public void notifyWaitQuery() {
+        if (!isAllBeMemoryEnough()) {
+            return;
+        }
+        int waitQueryCountSnapshot = queryWaitQueue.size();
+        Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator();
+        while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) {
+            QueueToken queueToken = queueTokenIterator.next();
+            queueToken.notifyWaitQuery();
+            waitQueryCountSnapshot--;
+        }
+    }
+
+    public void removeQueueToken(QueueToken queueToken) {
+        queryWaitQueue.remove(queueToken);
+    }
+
+    public boolean isAllBeMemoryEnough() {
+        return isAllBeMemoryEnough;
+    }
+
+    //TODO(wb): add more resource type
+    public boolean checkResourceAvailable(QueueToken queueToken) {
+        if (isAllBeMemoryEnough()) {
+            return true;
+        } else {
+            queueToken.setQueueMsg("WAIT_BE_MEMORY");
+            return false;
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 5953edbf66e..ba2d2526f2e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -17,14 +17,18 @@
 
 package org.apache.doris.resource.workloadgroup;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 
-import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.LinkedList;
 import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.locks.ReentrantLock;
 
 // note(wb) refer java BlockingQueue, but support altering capacity
@@ -38,8 +42,6 @@ public class QueryQueue {
     private int maxConcurrency;
     private int maxQueueSize;
     private int queueTimeout; // ms
-    // running property
-    private volatile int currentRunningQueryNum;
 
     public static final String RUNNING_QUERY_NUM = "running_query_num";
     public static final String WAITING_QUERY_NUM = "waiting_query_num";
@@ -48,16 +50,13 @@ public class QueryQueue {
 
     private long propVersion;
 
-    private PriorityQueue<QueueToken> priorityTokenQueue;
+    private PriorityQueue<QueueToken> waitingQueryQueue;
+    private Queue<QueueToken> runningQueryQueue;
 
-    int getCurrentRunningQueryNum() {
-        return currentRunningQueryNum;
-    }
-
-    int getCurrentWaitingQueryNum() {
+    Pair<Integer, Integer> getQueryQueueDetail() {
         try {
             queueLock.lock();
-            return priorityTokenQueue.size();
+            return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size());
         } finally {
             queueLock.unlock();
         }
@@ -89,36 +88,47 @@ public class QueryQueue {
         this.maxQueueSize = maxQueueSize;
         this.queueTimeout = queueTimeout;
         this.propVersion = propVersion;
-        this.priorityTokenQueue = new PriorityQueue<QueueToken>();
+        this.waitingQueryQueue = new PriorityQueue<QueueToken>();
+        this.runningQueryQueue = new LinkedList<QueueToken>();
     }
 
     public String debugString() {
         return "wgId= " + wgId + ", version=" + this.propVersion + 
",maxConcurrency=" + maxConcurrency
-                + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + 
queueTimeout
-                + ", currentRunningQueryNum=" + currentRunningQueryNum
-                + ", currentWaitingQueryNum=" + priorityTokenQueue.size();
+                + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + 
queueTimeout + ", currentRunningQueryNum="
+                + runningQueryQueue.size() + ", currentWaitingQueryNum=" + 
waitingQueryQueue.size();
     }
 
     public QueueToken getToken() throws UserException {
+        AdmissionControl admissionControl = 
Env.getCurrentEnv().getAdmissionControl();
         queueLock.lock();
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
             }
-            if (currentRunningQueryNum < maxConcurrency) {
-                currentRunningQueryNum++;
-                QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, 
queueTimeout, "offer success");
-                retToken.setQueueTimeWhenOfferSuccess();
-                return retToken;
-            }
-            if (priorityTokenQueue.size() >= maxQueueSize) {
+            QueueToken queueToken = new QueueToken(queueTimeout, this);
+
+            boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
+            boolean isResourceAvailable = 
admissionControl.checkResourceAvailable(queueToken);
+            if (!isReachMaxCon && isResourceAvailable) {
+                runningQueryQueue.offer(queueToken);
+                queueToken.complete();
+                return queueToken;
+            } else if (waitingQueryQueue.size() >= maxQueueSize) {
                 throw new UserException("query waiting queue is full, queue 
length=" + maxQueueSize);
+            } else {
+                if (isReachMaxCon) {
+                    queueToken.setQueueMsg("WAIT_IN_QUEUE");
+                }
+                queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS);
+                this.waitingQueryQueue.offer(queueToken);
+                // if a query is added to wg's queue but not in 
AdmissionControl's
+                // queue may be blocked by be memory later,
+                // then we should put query to AdmissionControl in 
releaseAndNotify, it's too complicated.
+                // To simplify the code logic, put all waiting query to 
AdmissionControl,
+                // waiting query can be notified when query finish or memory 
is enough.
+                admissionControl.addQueueToken(queueToken);
             }
-            QueueToken newQueryToken = new 
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
-                    "query wait timeout " + queueTimeout + " ms");
-            newQueryToken.setQueueTimeWhenQueueSuccess();
-            this.priorityTokenQueue.offer(newQueryToken);
-            return newQueryToken;
+            return queueToken;
         } finally {
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
@@ -127,35 +137,36 @@ public class QueryQueue {
         }
     }
 
-    // If the token is acquired and do work success, then call this method to 
release it.
-    public void returnToken(QueueToken token) {
+    public void notifyWaitQuery() {
+        releaseAndNotify(null);
+    }
+
+    public void releaseAndNotify(QueueToken releaseToken) {
+        AdmissionControl admissionControl = 
Env.getCurrentEnv().getAdmissionControl();
         queueLock.lock();
         try {
-            // If current token is not in ready to run state, then it is still 
in the queue
-            // it is not running, just remove it.
-            if (!token.isReadyToRun()) {
-                this.priorityTokenQueue.remove(token);
-                return;
-            }
-            currentRunningQueryNum--;
-            Preconditions.checkArgument(currentRunningQueryNum >= 0);
-            // If return token and find user changed concurrency num,  then 
maybe need signal
-            // more tokens.
-            while (currentRunningQueryNum < maxConcurrency) {
-                QueueToken nextToken = this.priorityTokenQueue.poll();
-                if (nextToken != null) {
-                    if (nextToken.signal()) {
-                        ++currentRunningQueryNum;
-                    }
+            runningQueryQueue.remove(releaseToken);
+            waitingQueryQueue.remove(releaseToken);
+            admissionControl.removeQueueToken(releaseToken);
+            while (runningQueryQueue.size() < maxConcurrency) {
+                QueueToken queueToken = waitingQueryQueue.peek();
+                if (queueToken == null) {
+                    break;
+                }
+                if (admissionControl.checkResourceAvailable(queueToken)) {
+                    queueToken.complete();
+                    runningQueryQueue.offer(queueToken);
+                    waitingQueryQueue.remove();
+                    admissionControl.removeQueueToken(queueToken);
                 } else {
                     break;
                 }
             }
         } finally {
+            queueLock.unlock();
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
             }
-            queueLock.unlock();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 0a982b81236..748c0c21bda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -17,13 +17,16 @@
 
 package org.apache.doris.resource.workloadgroup;
 
+import org.apache.doris.common.UserException;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 // used to mark QueryQueue offer result
 // if offer failed, then need to cancel query
@@ -38,134 +41,79 @@ public class QueueToken implements Comparable<QueueToken> {
 
     public enum TokenState {
         ENQUEUE_SUCCESS,
-        READY_TO_RUN,
-        CANCELLED
+        READY_TO_RUN
     }
 
     static AtomicLong tokenIdGenerator = new AtomicLong(0);
 
     private long tokenId = 0;
 
-    private TokenState tokenState;
+    private volatile TokenState tokenState;
 
     private long queueWaitTimeout = 0;
 
-    private String offerResultDetail;
+    private long queueStartTime = -1;
+    private long queueEndTime = -1;
 
-    private boolean isTimeout = false;
+    private volatile String queueMsg = "";
 
-    private final ReentrantLock tokenLock = new ReentrantLock();
-    private final Condition tokenCond = tokenLock.newCondition();
+    QueryQueue queryQueue = null;
 
-    private long queueStartTime = -1;
-    private long queueEndTime = -1;
+    // Object is just a placeholder, it's meaningless now
+    private CompletableFuture<Object> future;
 
-    public QueueToken(TokenState tokenState, long queueWaitTimeout,
-            String offerResultDetail) {
+    public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) {
         this.tokenId = tokenIdGenerator.addAndGet(1);
-        this.tokenState = tokenState;
         this.queueWaitTimeout = queueWaitTimeout;
-        this.offerResultDetail = offerResultDetail;
+        this.queueStartTime = System.currentTimeMillis();
+        this.queryQueue = queryQueue;
+        this.future = new CompletableFuture<>();
     }
 
-    public boolean waitSignal(long queryTimeoutMillis) throws 
InterruptedException {
-        this.tokenLock.lock();
-        try {
-            if (isTimeout) {
-                return false;
-            }
-            if (tokenState == TokenState.READY_TO_RUN) {
-                return true;
-            }
-            // If query timeout is less than queue wait timeout, then should 
use
-            // query timeout as wait timeout
-            long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? 
queueWaitTimeout : queryTimeoutMillis;
-            tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
-            if (tokenState == TokenState.CANCELLED) {
-                this.offerResultDetail = "query is cancelled in queue";
-                return false;
-            }
-            // If wait timeout and is steal not ready to run, then return false
-            if (tokenState != TokenState.READY_TO_RUN) {
-                LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
-                isTimeout = true;
-                return false;
-            } else {
-                return true;
-            }
-        } catch (Throwable t) {
-            LOG.warn("meet execption when wait for signal", t);
-            // If any exception happens, set isTimeout to true and return false
-            // Then the caller will call returnToken to queue normally.
-            offerResultDetail = "meet exeption when wait for signal";
-            isTimeout = true;
-            return false;
-        } finally {
-            this.tokenLock.unlock();
-            this.setQueueTimeWhenQueueEnd();
-        }
+    public void setQueueMsg(String msg) {
+        this.queueMsg = msg;
     }
 
-    public void signalForCancel() {
-        this.tokenLock.lock();
-        try {
-            if (this.tokenState == TokenState.ENQUEUE_SUCCESS) {
-                tokenCond.signal();
-                this.tokenState = TokenState.CANCELLED;
-            }
-        } catch (Throwable t) {
-            LOG.warn("error happens when signal for cancel", t);
-        } finally {
-            this.tokenLock.unlock();
-        }
+    public void setTokenState(TokenState tokenState) {
+        this.tokenState = tokenState;
     }
 
-    public boolean signal() {
-        this.tokenLock.lock();
+    public String getQueueMsg() {
+        return queueMsg;
+    }
+
+    public void get(String queryId, int queryTimeout) throws UserException {
+        if (isReadyToRun()) {
+            return;
+        }
+        long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, 
queryTimeout) : queryTimeout;
+        waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout;
         try {
-            // If current token is not ENQUEUE_SUCCESS, then it maybe has error
-            // not run it any more.
-            if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) {
-                return false;
-            }
-            this.tokenState = TokenState.READY_TO_RUN;
-            tokenCond.signal();
-            return true;
+            future.get(waitTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            throw new UserException("query queue timeout, timeout: " + 
waitTimeout + " ms ");
+        } catch (CancellationException e) {
+            throw new UserException("query is cancelled");
         } catch (Throwable t) {
-            isTimeout = true;
-            offerResultDetail = "meet exception when signal";
-            LOG.warn("failed to signal token", t);
-            return false;
-        } finally {
-            this.tokenLock.unlock();
+            String errMsg = String.format("error happens when query {} queue", 
queryId);
+            LOG.error(errMsg, t);
+            throw new RuntimeException(errMsg, t);
         }
     }
 
-    public String getOfferResultDetail() {
-        return offerResultDetail;
-    }
-
-    public boolean isReadyToRun() {
-        return this.tokenState == TokenState.READY_TO_RUN;
-    }
-
-    public boolean isCancelled() {
-        return this.tokenState == TokenState.CANCELLED;
-    }
-
-    public void setQueueTimeWhenOfferSuccess() {
-        long currentTime = System.currentTimeMillis();
-        this.queueStartTime = currentTime;
-        this.queueEndTime = currentTime;
+    public void complete() {
+        this.queueEndTime = System.currentTimeMillis();
+        this.tokenState = TokenState.READY_TO_RUN;
+        this.setQueueMsg("RUNNING");
+        future.complete(null);
     }
 
-    public void setQueueTimeWhenQueueSuccess() {
-        long currentTime = System.currentTimeMillis();
-        this.queueStartTime = currentTime;
+    public void notifyWaitQuery() {
+        this.queryQueue.notifyWaitQuery();
     }
 
-    public void setQueueTimeWhenQueueEnd() {
-        this.queueEndTime = System.currentTimeMillis();
+    public void cancel() {
+        future.cancel(true);
     }
 
     public long getQueueStartTime() {
@@ -176,8 +124,8 @@ public class QueueToken implements Comparable<QueueToken> {
         return queueEndTime;
     }
 
-    public TokenState getTokenState() {
-        return tokenState;
+    public boolean isReadyToRun() {
+        return tokenState == TokenState.READY_TO_RUN;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 8588e10cf34..2c200e425b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
@@ -431,6 +432,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         List<String> row = new ArrayList<>();
         row.add(String.valueOf(id));
         row.add(name);
+        Pair<Integer, Integer> queryQueueDetail = qq != null ? 
qq.getQueryQueueDetail() : null;
         // skip id,name,running query,waiting query
         for (int i = 2; i < 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
             String key = 
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
@@ -472,9 +474,9 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                     row.add(val + "%");
                 }
             } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
-                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentRunningQueryNum()));
+                row.add(queryQueueDetail == null ? "0" : 
String.valueOf(queryQueueDetail.first));
             } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
-                row.add(qq == null ? "0" : 
String.valueOf(qq.getCurrentWaitingQueryNum()));
+                row.add(queryQueueDetail == null ? "0" : 
String.valueOf(queryQueueDetail.second));
             } else if (TAG.equals(key)) {
                 String val = properties.get(key);
                 if (StringUtils.isEmpty(val)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 7d0f348d18b..9fb37242d84 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -51,7 +51,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -612,14 +611,8 @@ public class MetadataGenerator {
                 trow.addToColumnValue(new TCell());
             }
 
-            TokenState tokenState = queryInfo.getQueueStatus();
-            if (tokenState == null) {
-                trow.addToColumnValue(new TCell());
-            } else if (tokenState == TokenState.READY_TO_RUN) {
-                trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
-            } else {
-                trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
-            }
+            String queueMsg = queryInfo.getQueueStatus();
+            trow.addToColumnValue(new TCell().setStringVal(queueMsg));
 
             trow.addToColumnValue(new 
TCell().setStringVal(queryInfo.getSql()));
             dataBatch.add(trow);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index d03d3595682..af921cd1821 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -29,6 +29,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -148,6 +150,11 @@ public class GenericPoolTest {
             return null;
         }
 
+        @Override
+        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
+            return null;
+        }
+
         @Override
         public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) 
throws TException {
             // TODO Auto-generated method stub
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 12273331634..de73404b886 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -47,6 +47,8 @@ import org.apache.doris.thrift.TExportState;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
 import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
@@ -354,6 +356,11 @@ public class MockedBackendFactory {
             return new TPublishTopicResult(new TStatus(TStatusCode.OK));
         }
 
+        @Override
+        public TGetBeResourceResult getBeResource(TGetBeResourceRequest 
request) throws TException {
+            return null;
+        }
+
         @Override
         public TStatus submitExportTask(TExportTaskRequest request) throws 
TException {
             return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index f80c66dd827..0255d0d61a3 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -242,6 +242,19 @@ struct TPublishTopicResult {
     1: required Status.TStatus status
 }
 
+
+struct TGetBeResourceRequest {
+}
+
+struct TGlobalResourceUsage {
+    1: optional i64 mem_limit
+    2: optional i64 mem_usage
+}
+
+struct TGetBeResourceResult {
+    1: optional TGlobalResourceUsage global_resource_usage
+}
+
 service BackendService {
     // Called by coord to start asynchronous execution of plan fragment in 
backend.
     // Returns as soon as all incoming data streams have been set up.
@@ -298,4 +311,6 @@ service BackendService {
     TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest 
query_ingest_binlog_request);
 
     TPublishTopicResult publish_topic_info(1:TPublishTopicRequest 
topic_request);
+
+    TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request);
 }
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 70d4a550890..ec34b489e0b 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -325,7 +325,7 @@ suite("test_crud_wlg") {
     test {
         sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from 
${table_name};"
 
-        exception "query wait timeout"
+        exception "query queue timeout"
     }
 
     // test query queue running query/waiting num
@@ -520,7 +520,7 @@ suite("test_crud_wlg") {
     sql "create workload group if not exists bypass_group properties (  
'max_concurrency'='0','max_queue_size'='0','queue_timeout'='0');"
     sql "set workload_group=bypass_group;"
     test {
-        sql "select count(1) from information_schema.active_queries;"
+        sql "select count(1) from ${table_name};"
         exception "query waiting queue is full"
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to