This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f68ec9de0 support query queue (#20048)
6f68ec9de0 is described below

commit 6f68ec9de0d67846ccf60f0686a3f155d92ea63e
Author: wangbo <wan...@apache.org>
AuthorDate: Tue May 30 19:52:27 2023 +0800

    support query queue (#20048)
    
    support query queue (#20048)
---
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  28 ++++-
 .../doris/resource/resourcegroup/QueryQueue.java   | 117 +++++++++++++++++++++
 .../resource/resourcegroup/QueueOfferToken.java    |  46 ++++++++
 .../resource/resourcegroup/ResourceGroup.java      | 107 +++++++++++++++++--
 .../resource/resourcegroup/ResourceGroupMgr.java   |  13 +++
 .../resource/resourcegroup/ResourceGroupTest.java  |   4 +-
 7 files changed, 309 insertions(+), 9 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 04d0d22e6b..23ecd187ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1492,6 +1492,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true, expType = 
ExperimentalType.EXPERIMENTAL)
     public static boolean enable_resource_group = false;
 
+    @ConfField(mutable = true)
+    public static boolean enable_query_queue = true;
+
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 79a69ac3d3..458102dc6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -135,6 +135,8 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
+import org.apache.doris.resource.resourcegroup.QueryQueue;
+import org.apache.doris.resource.resourcegroup.QueueOfferToken;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.RpcException;
@@ -204,6 +206,9 @@ public class StmtExecutor {
     private OriginStatement originStmt;
     private StatementBase parsedStmt;
     private Analyzer analyzer;
+    private QueryQueue queryQueue = null;
+    // by default, false means no query queued, then no need to poll when 
query finish
+    private QueueOfferToken offerRet = new QueueOfferToken(false);
     private ProfileType profileType = ProfileType.QUERY;
     private volatile Coordinator coord = null;
     private MasterOpExecutor masterOpExecutor = null;
@@ -552,6 +557,24 @@ public class StmtExecutor {
     }
 
     private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
+        // queue query here
+        if (!parsedStmt.isExplain() && Config.enable_resource_group && 
Config.enable_query_queue) {
+            this.queryQueue = analyzer.getEnv().getResourceGroupMgr()
+                    
.getResourceGroupQueryQueue(context.sessionVariable.resourceGroup);
+            try {
+                this.offerRet = queryQueue.offer();
+            } catch (InterruptedException e) {
+                // this Exception means try lock/await failed, so no need to 
handle offer result
+                LOG.error("error happens when offer queue, query id=" + 
DebugUtil.printId(queryId) + " ", e);
+                throw new RuntimeException("interrupted Exception happens when 
queue query");
+            }
+            if (!offerRet.isOfferSuccess()) {
+                String retMsg = "queue failed, reason=" + 
offerRet.getOfferResultDetail();
+                LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + 
retMsg);
+                throw new UserException(retMsg);
+            }
+        }
+
         int retryTime = Config.max_query_retry_time;
         for (int i = 0; i < retryTime; i++) {
             try {
@@ -621,6 +644,9 @@ public class StmtExecutor {
                     throw e;
                 } finally {
                     queryAnalysisSpan.end();
+                    if (offerRet.isOfferSuccess()) {
+                        queryQueue.poll();
+                    }
                 }
                 if (isForwardToMaster()) {
                     if (isProxy) {
@@ -801,7 +827,7 @@ public class StmtExecutor {
     }
 
     // Analyze one statement to structure in memory.
-    public void analyze(TQueryOptions tQueryOptions) throws UserException {
+    public void analyze(TQueryOptions tQueryOptions) throws UserException, 
InterruptedException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", 
context.getStmtId(),
                     context.getForwardedStmtId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
new file mode 100644
index 0000000000..4b464bd94d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.resourcegroup;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+// note(wb) refer java BlockingQueue, but support altering capacity
+// todo(wb) add wait time to profile
+public class QueryQueue {
+
+    private static final Logger LOG = LogManager.getLogger(QueryQueue.class);
+    // note(wb) used unfair by default, need more test later
+    private final ReentrantLock queueLock = new ReentrantLock();
+    private final Condition queueLockCond = queueLock.newCondition();
+    // resource group property
+    private int maxConcurrency;
+    private int maxQueueSize;
+    private int queueTimeout; // ms
+    // running property
+    private int currentRunningQueryNum;
+    private int currentWaitingQueryNum;
+
+    public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) {
+        this.maxConcurrency = maxConcurrency;
+        this.maxQueueSize = maxQueueSize;
+        this.queueTimeout = queueTimeout;
+    }
+
+    public String debugString() {
+        return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + 
maxQueueSize + ", queueTimeout=" + queueTimeout
+                + ", currentRunningQueryNum=" + currentRunningQueryNum + ", 
currentWaitingQueryNum="
+                + currentWaitingQueryNum;
+    }
+
+    public QueueOfferToken offer() throws InterruptedException {
+        // to prevent hang
+        // the lock shouldn't be hold for too long
+        // we should catch the case when it happens
+        queueLock.tryLock(5, TimeUnit.SECONDS);
+        try {
+            // currentRunningQueryNum may bigger than maxRunningQueryNum
+            // because maxRunningQueryNum can be altered
+            if (currentRunningQueryNum >= maxConcurrency) {
+                if (currentWaitingQueryNum >= maxQueueSize) {
+                    LOG.debug(this.debugString());
+                    return new QueueOfferToken(false, "query waiting queue is 
full, queue length=" + maxQueueSize);
+                }
+
+                currentWaitingQueryNum++;
+                boolean ret;
+                try {
+                    ret = queueLockCond.await(queueTimeout, 
TimeUnit.MILLISECONDS);
+                } finally {
+                    currentWaitingQueryNum--;
+                }
+                if (!ret) {
+                    LOG.debug(this.debugString());
+                    return new QueueOfferToken(false, "query wait timeout " + 
queueTimeout + " ms");
+                }
+            }
+            currentRunningQueryNum++;
+            return new QueueOfferToken(true, "offer success");
+        } finally {
+            queueLock.unlock();
+        }
+    }
+
+    public void poll() throws InterruptedException {
+        queueLock.tryLock(5, TimeUnit.SECONDS);
+        try {
+            currentRunningQueryNum--;
+            Preconditions.checkArgument(currentRunningQueryNum >= 0);
+            // maybe only when currentWaitingQueryNum != 0 need to signal
+            queueLockCond.signal();
+        } finally {
+            queueLock.unlock();
+        }
+    }
+
+    public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int 
queryWaitTimeout) {
+        try {
+            queueLock.tryLock(5, TimeUnit.SECONDS);
+            try {
+                this.maxConcurrency = maxConcurrency;
+                this.maxQueueSize = maxQueueSize;
+                this.queueTimeout = queryWaitTimeout;
+            } finally {
+                queueLock.unlock();
+            }
+        } catch (InterruptedException e) {
+            LOG.error("reset queue property failed, ", e);
+            throw new RuntimeException("reset queue property failed, reason=" 
+ e.getMessage());
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
new file mode 100644
index 0000000000..4096a1095d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.resourcegroup;
+
+// used to mark QueryQueue offer result
+// if offer failed, then need to cancel query
+// and return failed reason to user client
+public class QueueOfferToken {
+
+    private Boolean offerResult;
+
+    private String offerResultDetail;
+
+    public QueueOfferToken(Boolean offerResult) {
+        this.offerResult = offerResult;
+    }
+
+    public QueueOfferToken(Boolean offerResult, String offerResultDetail) {
+        this.offerResult = offerResult;
+        this.offerResultDetail = offerResultDetail;
+    }
+
+    public Boolean isOfferSuccess() {
+        return offerResult;
+    }
+
+    public String getOfferResultDetail() {
+        return offerResultDetail;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
index 6d34cb7595..039209df44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineResourceGroup;
 
@@ -39,7 +40,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ResourceGroup implements Writable {
+public class ResourceGroup implements Writable, GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(ResourceGroup.class);
 
     public static final String CPU_SHARE = "cpu_share";
@@ -48,11 +49,18 @@ public class ResourceGroup implements Writable {
 
     public static final String ENABLE_MEMORY_OVERCOMMIT = 
"enable_memory_overcommit";
 
+    public static final String MAX_CONCURRENCY = "max_concurrency";
+
+    public static final String MAX_QUEUE_SIZE = "max_queue_size";
+
+    public static final String QUEUE_TIMEOUT = "queue_timeout";
+
     private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>().add(
             CPU_SHARE).add(MEMORY_LIMIT).build();
 
-    private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>().add(
-            CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).build();
+    private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>()
+            
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
+            .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build();
 
     @SerializedName(value = "id")
     private long id;
@@ -69,6 +77,11 @@ public class ResourceGroup implements Writable {
 
     private double memoryLimitPercent;
 
+    private QueryQueue queryQueue;
+    private int maxConcurrency = Integer.MAX_VALUE;
+    private int maxQueueSize = 0;
+    private int queueTimeout = 0;
+
     private ResourceGroup(long id, String name, Map<String, String> 
properties) {
         this(id, name, properties, 0);
     }
@@ -85,11 +98,54 @@ public class ResourceGroup implements Writable {
         }
     }
 
+    // called when first create a resource group, load from image or user new 
create a group
+    public void initQueryQueue() {
+        resetQueueProperty(properties);
+        // if query queue property is not set, when use default value
+        this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize, 
queueTimeout);
+    }
+
+    void resetQueryQueue(QueryQueue queryQueue) {
+        resetQueueProperty(properties);
+        this.queryQueue = queryQueue;
+        this.queryQueue.resetQueueProperty(this.maxConcurrency, 
this.maxQueueSize, this.queueTimeout);
+
+    }
+
+    private void resetQueueProperty(Map<String, String> properties) {
+        if (properties.containsKey(MAX_CONCURRENCY)) {
+            this.maxConcurrency = 
Integer.parseInt(properties.get(MAX_CONCURRENCY));
+        } else {
+            this.maxConcurrency = Integer.MAX_VALUE;
+            properties.put(MAX_CONCURRENCY, 
String.valueOf(this.maxConcurrency));
+        }
+        if (properties.containsKey(MAX_QUEUE_SIZE)) {
+            this.maxQueueSize = 
Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
+        } else {
+            this.maxQueueSize = 0;
+            properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
+        }
+        if (properties.containsKey(QUEUE_TIMEOUT)) {
+            this.queueTimeout = 
Integer.parseInt(properties.get(QUEUE_TIMEOUT));
+        } else {
+            this.queueTimeout = 0;
+            properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
+        }
+    }
+
+    public QueryQueue getQueryQueue() {
+        return this.queryQueue;
+    }
+
+    // new resource group
     public static ResourceGroup create(String name, Map<String, String> 
properties) throws DdlException {
         checkProperties(properties);
-        return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, 
properties);
+        ResourceGroup newResourceGroup = new 
ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties);
+        newResourceGroup.initQueryQueue();
+        return newResourceGroup;
     }
 
+    // alter resource group
     public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup, 
Map<String, String> updateProperties)
             throws DdlException {
         Map<String, String> newProperties = new 
HashMap<>(resourceGroup.getProperties());
@@ -100,8 +156,13 @@ public class ResourceGroup implements Writable {
         }
 
         checkProperties(newProperties);
-        return new ResourceGroup(
-           resourceGroup.getId(), resourceGroup.getName(), newProperties, 
resourceGroup.getVersion() + 1);
+        ResourceGroup newResourceGroup = new ResourceGroup(
+                resourceGroup.getId(), resourceGroup.getName(), newProperties, 
resourceGroup.getVersion() + 1);
+
+        // note(wb) query queue should be unique and can not be copy
+        newResourceGroup.resetQueryQueue(resourceGroup.getQueryQueue());
+
+        return newResourceGroup;
     }
 
     private static void checkProperties(Map<String, String> properties) throws 
DdlException {
@@ -141,6 +202,35 @@ public class ResourceGroup implements Writable {
                 throw new DdlException("The value of '" + 
ENABLE_MEMORY_OVERCOMMIT + "' must be true or false.");
             }
         }
+
+        // check queue property
+        if (properties.containsKey(MAX_CONCURRENCY)) {
+            try {
+                if (Integer.parseInt(properties.get(MAX_CONCURRENCY)) < 0) {
+                    throw new DdlException(MAX_CONCURRENCY + " requires a 
positive integer");
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(MAX_CONCURRENCY + " requires a positive 
integer");
+            }
+        }
+        if (properties.containsKey(MAX_QUEUE_SIZE)) {
+            try {
+                if (Integer.parseInt(properties.get(MAX_QUEUE_SIZE)) < 0) {
+                    throw new DdlException(MAX_QUEUE_SIZE + " requires a 
positive integer");
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(MAX_QUEUE_SIZE + " requires a positive 
integer");
+            }
+        }
+        if (properties.containsKey(QUEUE_TIMEOUT)) {
+            try {
+                if (Integer.parseInt(properties.get(QUEUE_TIMEOUT)) < 0) {
+                    throw new DdlException(QUEUE_TIMEOUT + " requires a 
positive integer");
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(QUEUE_TIMEOUT + " requires a positive 
integer");
+            }
+        }
     }
 
     public long getId() {
@@ -188,4 +278,9 @@ public class ResourceGroup implements Writable {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ResourceGroup.class);
     }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        this.initQueryQueue();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
index f83a9a1678..a11907f34c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
@@ -114,6 +114,19 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
         return resourceGroups;
     }
 
+    public QueryQueue getResourceGroupQueryQueue(String groupName) throws 
UserException {
+        readLock();
+        try {
+            ResourceGroup resourceGroup = nameToResourceGroup.get(groupName);
+            if (resourceGroup == null) {
+                throw new UserException("Resource group " + groupName + " does 
not exist");
+            }
+            return resourceGroup.getQueryQueue();
+        } finally {
+            readUnlock();
+        }
+    }
+
     private void checkAndCreateDefaultGroup() {
         ResourceGroup defaultResourceGroup = null;
         writeLock();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
index 9f174e201c..ff7199aa67 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
@@ -37,7 +37,7 @@ public class ResourceGroupTest {
         String name1 = "g1";
         ResourceGroup group1 = ResourceGroup.create(name1, properties1);
         Assert.assertEquals(name1, group1.getName());
-        Assert.assertEquals(2, group1.getProperties().size());
+        Assert.assertEquals(5, group1.getProperties().size());
         
Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE));
         Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) < 
1e-6);
     }
@@ -92,6 +92,6 @@ public class ResourceGroupTest {
         BaseProcResult result = new BaseProcResult();
         group1.getProcNodeData(result);
         List<List<String>> rows = result.getRows();
-        Assert.assertEquals(2, rows.size());
+        Assert.assertEquals(5, rows.size());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to