This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6f68ec9de0 support query queue (#20048) 6f68ec9de0 is described below commit 6f68ec9de0d67846ccf60f0686a3f155d92ea63e Author: wangbo <wan...@apache.org> AuthorDate: Tue May 30 19:52:27 2023 +0800 support query queue (#20048) support query queue (#20048) --- .../main/java/org/apache/doris/common/Config.java | 3 + .../java/org/apache/doris/qe/StmtExecutor.java | 28 ++++- .../doris/resource/resourcegroup/QueryQueue.java | 117 +++++++++++++++++++++ .../resource/resourcegroup/QueueOfferToken.java | 46 ++++++++ .../resource/resourcegroup/ResourceGroup.java | 107 +++++++++++++++++-- .../resource/resourcegroup/ResourceGroupMgr.java | 13 +++ .../resource/resourcegroup/ResourceGroupTest.java | 4 +- 7 files changed, 309 insertions(+), 9 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 04d0d22e6b..23ecd187ab 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1492,6 +1492,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL) public static boolean enable_resource_group = false; + @ConfField(mutable = true) + public static boolean enable_query_queue = true; + @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 79a69ac3d3..458102dc6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -135,6 +135,8 @@ import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; +import org.apache.doris.resource.resourcegroup.QueryQueue; +import org.apache.doris.resource.resourcegroup.QueueOfferToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -204,6 +206,9 @@ public class StmtExecutor { private OriginStatement originStmt; private StatementBase parsedStmt; private Analyzer analyzer; + private QueryQueue queryQueue = null; + // by default, false means no query queued, then no need to poll when query finish + private QueueOfferToken offerRet = new QueueOfferToken(false); private ProfileType profileType = ProfileType.QUERY; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; @@ -552,6 +557,24 @@ public class StmtExecutor { } private void handleQueryWithRetry(TUniqueId queryId) throws Exception { + // queue query here + if (!parsedStmt.isExplain() && Config.enable_resource_group && Config.enable_query_queue) { + this.queryQueue = analyzer.getEnv().getResourceGroupMgr() + .getResourceGroupQueryQueue(context.sessionVariable.resourceGroup); + try { + this.offerRet = queryQueue.offer(); + } catch (InterruptedException e) { + // this Exception means try lock/await failed, so no need to handle offer result + LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e); + throw new RuntimeException("interrupted Exception happens when queue query"); + } + if (!offerRet.isOfferSuccess()) { + String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail(); + LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg); + throw new UserException(retMsg); + } + } + int retryTime = Config.max_query_retry_time; for (int i = 0; i < retryTime; i++) { try { @@ -621,6 +644,9 @@ public class StmtExecutor { throw e; } finally { queryAnalysisSpan.end(); + if (offerRet.isOfferSuccess()) { + queryQueue.poll(); + } } if (isForwardToMaster()) { if (isProxy) { @@ -801,7 +827,7 @@ public class StmtExecutor { } // Analyze one statement to structure in memory. - public void analyze(TQueryOptions tQueryOptions) throws UserException { + public void analyze(TQueryOptions tQueryOptions) throws UserException, InterruptedException { if (LOG.isDebugEnabled()) { LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java new file mode 100644 index 0000000000..4b464bd94d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueryQueue.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.resourcegroup; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +// note(wb) refer java BlockingQueue, but support altering capacity +// todo(wb) add wait time to profile +public class QueryQueue { + + private static final Logger LOG = LogManager.getLogger(QueryQueue.class); + // note(wb) used unfair by default, need more test later + private final ReentrantLock queueLock = new ReentrantLock(); + private final Condition queueLockCond = queueLock.newCondition(); + // resource group property + private int maxConcurrency; + private int maxQueueSize; + private int queueTimeout; // ms + // running property + private int currentRunningQueryNum; + private int currentWaitingQueryNum; + + public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) { + this.maxConcurrency = maxConcurrency; + this.maxQueueSize = maxQueueSize; + this.queueTimeout = queueTimeout; + } + + public String debugString() { + return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + + ", currentRunningQueryNum=" + currentRunningQueryNum + ", currentWaitingQueryNum=" + + currentWaitingQueryNum; + } + + public QueueOfferToken offer() throws InterruptedException { + // to prevent hang + // the lock shouldn't be hold for too long + // we should catch the case when it happens + queueLock.tryLock(5, TimeUnit.SECONDS); + try { + // currentRunningQueryNum may bigger than maxRunningQueryNum + // because maxRunningQueryNum can be altered + if (currentRunningQueryNum >= maxConcurrency) { + if (currentWaitingQueryNum >= maxQueueSize) { + LOG.debug(this.debugString()); + return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize); + } + + currentWaitingQueryNum++; + boolean ret; + try { + ret = queueLockCond.await(queueTimeout, TimeUnit.MILLISECONDS); + } finally { + currentWaitingQueryNum--; + } + if (!ret) { + LOG.debug(this.debugString()); + return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms"); + } + } + currentRunningQueryNum++; + return new QueueOfferToken(true, "offer success"); + } finally { + queueLock.unlock(); + } + } + + public void poll() throws InterruptedException { + queueLock.tryLock(5, TimeUnit.SECONDS); + try { + currentRunningQueryNum--; + Preconditions.checkArgument(currentRunningQueryNum >= 0); + // maybe only when currentWaitingQueryNum != 0 need to signal + queueLockCond.signal(); + } finally { + queueLock.unlock(); + } + } + + public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout) { + try { + queueLock.tryLock(5, TimeUnit.SECONDS); + try { + this.maxConcurrency = maxConcurrency; + this.maxQueueSize = maxQueueSize; + this.queueTimeout = queryWaitTimeout; + } finally { + queueLock.unlock(); + } + } catch (InterruptedException e) { + LOG.error("reset queue property failed, ", e); + throw new RuntimeException("reset queue property failed, reason=" + e.getMessage()); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java new file mode 100644 index 0000000000..4096a1095d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/QueueOfferToken.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.resourcegroup; + +// used to mark QueryQueue offer result +// if offer failed, then need to cancel query +// and return failed reason to user client +public class QueueOfferToken { + + private Boolean offerResult; + + private String offerResultDetail; + + public QueueOfferToken(Boolean offerResult) { + this.offerResult = offerResult; + } + + public QueueOfferToken(Boolean offerResult, String offerResultDetail) { + this.offerResult = offerResult; + this.offerResultDetail = offerResultDetail; + } + + public Boolean isOfferSuccess() { + return offerResult; + } + + public String getOfferResultDetail() { + return offerResultDetail; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java index 6d34cb7595..039209df44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java @@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineResourceGroup; @@ -39,7 +40,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -public class ResourceGroup implements Writable { +public class ResourceGroup implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(ResourceGroup.class); public static final String CPU_SHARE = "cpu_share"; @@ -48,11 +49,18 @@ public class ResourceGroup implements Writable { public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; + public static final String MAX_CONCURRENCY = "max_concurrency"; + + public static final String MAX_QUEUE_SIZE = "max_queue_size"; + + public static final String QUEUE_TIMEOUT = "queue_timeout"; + private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add( CPU_SHARE).add(MEMORY_LIMIT).build(); - private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add( - CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).build(); + private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>() + .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) + .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build(); @SerializedName(value = "id") private long id; @@ -69,6 +77,11 @@ public class ResourceGroup implements Writable { private double memoryLimitPercent; + private QueryQueue queryQueue; + private int maxConcurrency = Integer.MAX_VALUE; + private int maxQueueSize = 0; + private int queueTimeout = 0; + private ResourceGroup(long id, String name, Map<String, String> properties) { this(id, name, properties, 0); } @@ -85,11 +98,54 @@ public class ResourceGroup implements Writable { } } + // called when first create a resource group, load from image or user new create a group + public void initQueryQueue() { + resetQueueProperty(properties); + // if query queue property is not set, when use default value + this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize, queueTimeout); + } + + void resetQueryQueue(QueryQueue queryQueue) { + resetQueueProperty(properties); + this.queryQueue = queryQueue; + this.queryQueue.resetQueueProperty(this.maxConcurrency, this.maxQueueSize, this.queueTimeout); + + } + + private void resetQueueProperty(Map<String, String> properties) { + if (properties.containsKey(MAX_CONCURRENCY)) { + this.maxConcurrency = Integer.parseInt(properties.get(MAX_CONCURRENCY)); + } else { + this.maxConcurrency = Integer.MAX_VALUE; + properties.put(MAX_CONCURRENCY, String.valueOf(this.maxConcurrency)); + } + if (properties.containsKey(MAX_QUEUE_SIZE)) { + this.maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE)); + } else { + this.maxQueueSize = 0; + properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize)); + } + if (properties.containsKey(QUEUE_TIMEOUT)) { + this.queueTimeout = Integer.parseInt(properties.get(QUEUE_TIMEOUT)); + } else { + this.queueTimeout = 0; + properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout)); + } + } + + public QueryQueue getQueryQueue() { + return this.queryQueue; + } + + // new resource group public static ResourceGroup create(String name, Map<String, String> properties) throws DdlException { checkProperties(properties); - return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties); + ResourceGroup newResourceGroup = new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties); + newResourceGroup.initQueryQueue(); + return newResourceGroup; } + // alter resource group public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup, Map<String, String> updateProperties) throws DdlException { Map<String, String> newProperties = new HashMap<>(resourceGroup.getProperties()); @@ -100,8 +156,13 @@ public class ResourceGroup implements Writable { } checkProperties(newProperties); - return new ResourceGroup( - resourceGroup.getId(), resourceGroup.getName(), newProperties, resourceGroup.getVersion() + 1); + ResourceGroup newResourceGroup = new ResourceGroup( + resourceGroup.getId(), resourceGroup.getName(), newProperties, resourceGroup.getVersion() + 1); + + // note(wb) query queue should be unique and can not be copy + newResourceGroup.resetQueryQueue(resourceGroup.getQueryQueue()); + + return newResourceGroup; } private static void checkProperties(Map<String, String> properties) throws DdlException { @@ -141,6 +202,35 @@ public class ResourceGroup implements Writable { throw new DdlException("The value of '" + ENABLE_MEMORY_OVERCOMMIT + "' must be true or false."); } } + + // check queue property + if (properties.containsKey(MAX_CONCURRENCY)) { + try { + if (Integer.parseInt(properties.get(MAX_CONCURRENCY)) < 0) { + throw new DdlException(MAX_CONCURRENCY + " requires a positive integer"); + } + } catch (NumberFormatException e) { + throw new DdlException(MAX_CONCURRENCY + " requires a positive integer"); + } + } + if (properties.containsKey(MAX_QUEUE_SIZE)) { + try { + if (Integer.parseInt(properties.get(MAX_QUEUE_SIZE)) < 0) { + throw new DdlException(MAX_QUEUE_SIZE + " requires a positive integer"); + } + } catch (NumberFormatException e) { + throw new DdlException(MAX_QUEUE_SIZE + " requires a positive integer"); + } + } + if (properties.containsKey(QUEUE_TIMEOUT)) { + try { + if (Integer.parseInt(properties.get(QUEUE_TIMEOUT)) < 0) { + throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer"); + } + } catch (NumberFormatException e) { + throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer"); + } + } } public long getId() { @@ -188,4 +278,9 @@ public class ResourceGroup implements Writable { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ResourceGroup.class); } + + @Override + public void gsonPostProcess() throws IOException { + this.initQueryQueue(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java index f83a9a1678..a11907f34c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java @@ -114,6 +114,19 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable { return resourceGroups; } + public QueryQueue getResourceGroupQueryQueue(String groupName) throws UserException { + readLock(); + try { + ResourceGroup resourceGroup = nameToResourceGroup.get(groupName); + if (resourceGroup == null) { + throw new UserException("Resource group " + groupName + " does not exist"); + } + return resourceGroup.getQueryQueue(); + } finally { + readUnlock(); + } + } + private void checkAndCreateDefaultGroup() { ResourceGroup defaultResourceGroup = null; writeLock(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java index 9f174e201c..ff7199aa67 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java @@ -37,7 +37,7 @@ public class ResourceGroupTest { String name1 = "g1"; ResourceGroup group1 = ResourceGroup.create(name1, properties1); Assert.assertEquals(name1, group1.getName()); - Assert.assertEquals(2, group1.getProperties().size()); + Assert.assertEquals(5, group1.getProperties().size()); Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE)); Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) < 1e-6); } @@ -92,6 +92,6 @@ public class ResourceGroupTest { BaseProcResult result = new BaseProcResult(); group1.getProcNodeData(result); List<List<String>> rows = result.getRows(); - Assert.assertEquals(2, rows.size()); + Assert.assertEquals(5, rows.size()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org