This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 1c566253a81 [Pick][Improment]Query queued by be memory (#37559) (#39733) 1c566253a81 is described below commit 1c566253a819ffa6b91eb8fb2925b792520bff26 Author: wangbo <wan...@apache.org> AuthorDate: Thu Aug 22 15:14:47 2024 +0800 [Pick][Improment]Query queued by be memory (#37559) (#39733) pick #37559 --- be/src/service/backend_service.cpp | 11 ++ be/src/service/backend_service.h | 3 + .../main/java/org/apache/doris/common/Config.java | 11 ++ .../main/java/org/apache/doris/catalog/Env.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 43 ++++-- .../java/org/apache/doris/qe/QeProcessorImpl.java | 7 +- .../apache/doris/resource/AdmissionControl.java | 156 +++++++++++++++++++++ .../doris/resource/workloadgroup/QueryQueue.java | 101 +++++++------ .../doris/resource/workloadgroup/QueueToken.java | 152 +++++++------------- .../resource/workloadgroup/WorkloadGroup.java | 6 +- .../doris/tablefunction/MetadataGenerator.java | 11 +- .../org/apache/doris/common/GenericPoolTest.java | 7 + .../apache/doris/utframe/MockedBackendFactory.java | 7 + gensrc/thrift/BackendService.thrift | 15 ++ .../workload_manager_p0/test_curd_wlg.groovy | 4 +- 15 files changed, 367 insertions(+), 176 deletions(-) diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index e26264b1a22..c30b936769a 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1110,4 +1110,15 @@ void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result, break; } } + +void BackendService::get_be_resource(TGetBeResourceResult& result, + const TGetBeResourceRequest& request) { + int64_t mem_usage = PerfCounters::get_vm_rss(); + int64_t mem_limit = MemInfo::mem_limit(); + TGlobalResourceUsage global_resource_usage; + global_resource_usage.__set_mem_limit(mem_limit); + global_resource_usage.__set_mem_usage(mem_usage); + result.__set_global_resource_usage(global_resource_usage); +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 9d53ec4bc45..bbcf103167f 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -140,6 +140,9 @@ public: void query_ingest_binlog(TQueryIngestBinlogResult& result, const TQueryIngestBinlogRequest& request) override; + void get_be_resource(TGetBeResourceResult& result, + const TGetBeResourceRequest& request) override; + private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); ExecEnv* _exec_env = nullptr; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6a0394a0be7..5e0cb99b638 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1782,6 +1782,17 @@ public class Config extends ConfigBase { @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_cpu_hard_limit = false; + @ConfField(mutable = true, description = { + "当BE内存用量大于该值时,查询会进入排队逻辑,默认值为-1,代表该值不生效。取值范围0~1的小数", + "When be memory usage bigger than this value, query could queue, " + + "default value is -1, means this value not work. Decimal value range from 0 to 1"}) + public static double query_queue_by_be_used_memory = -1; + + @ConfField(mutable = true, description = {"基于内存反压场景FE定时拉取BE内存用量的时间间隔", + "In the scenario of memory backpressure, " + + "the time interval for obtaining BE memory usage at regular intervals"}) + public static long get_be_resource_usage_interval_ms = 10000; + @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 785b7add3a0..d6af35c5bf1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.QueryCancelWorker; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; @@ -515,6 +516,8 @@ public class Env { private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr; + private AdmissionControl admissionControl; + private QueryStats queryStats; private StatisticsCleaner statisticsCleaner; @@ -772,6 +775,7 @@ public class Env { this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr(); + this.admissionControl = new AdmissionControl(systemInfo); this.queryStats = new QueryStats(); this.loadManagerAdapter = new LoadManagerAdapter(); this.hiveTransactionMgr = new HiveTransactionMgr(); @@ -883,6 +887,10 @@ public class Env { return workloadRuntimeStatusMgr; } + public AdmissionControl getAdmissionControl() { + return admissionControl; + } + public ExternalMetaIdMgr getExternalMetaIdMgr() { return externalMetaIdMgr; } @@ -1747,6 +1755,7 @@ public class Env { workloadGroupMgr.start(); workloadSchedPolicyMgr.start(); workloadRuntimeStatusMgr.start(); + admissionControl.start(); splitSourceManager.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 19ca1050469..d676ec5b908 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -62,6 +62,7 @@ import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.SortNode; import org.apache.doris.planner.UnionNode; @@ -645,6 +646,22 @@ public class Coordinator implements CoordInterface { return fragmentParams; } + private boolean shouldQueue() { + boolean ret = Config.enable_query_queue && !context.getSessionVariable() + .getBypassWorkloadGroup() && !isQueryCancelled(); + if (!ret) { + return false; + } + // a query with ScanNode need not queue only when all its scan node is SchemaScanNode + for (ScanNode scanNode : this.scanNodes) { + boolean isSchemaScanNode = scanNode instanceof SchemaScanNode; + if (!isSchemaScanNode) { + return true; + } + } + return false; + } + // Initiate asynchronous execution of query. Returns as soon as all plan fragments // have started executing at their respective backends. // 'Request' must contain at least a coordinator plan fragment (ie, can't @@ -656,8 +673,7 @@ public class Coordinator implements CoordInterface { if (context != null) { if (Config.enable_workload_group) { this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); - boolean shouldQueue = Config.enable_query_queue && !context.getSessionVariable() - .getBypassWorkloadGroup() && !isQueryCancelled(); + boolean shouldQueue = this.shouldQueue(); if (shouldQueue) { queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); if (queryQueue == null) { @@ -666,11 +682,8 @@ public class Coordinator implements CoordInterface { throw new UserException("could not find query queue"); } queueToken = queryQueue.getToken(); - if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); - queryQueue.returnToken(queueToken); - throw new UserException(queueToken.getOfferResultDetail()); - } + queueToken.get(DebugUtil.printId(queryId), + this.queryOptions.getExecutionTimeout() * 1000); } } else { context.setWorkloadGroupName(""); @@ -681,16 +694,22 @@ public class Coordinator implements CoordInterface { @Override public void close() { - for (ScanNode scanNode : scanNodes) { - scanNode.stop(); - } + // NOTE: all close method should be no exception if (queryQueue != null && queueToken != null) { try { - queryQueue.returnToken(queueToken); + queryQueue.releaseAndNotify(queueToken); } catch (Throwable t) { LOG.error("error happens when coordinator close ", t); } } + + try { + for (ScanNode scanNode : scanNodes) { + scanNode.stop(); + } + } catch (Throwable t) { + LOG.error("error happens when scannode stop ", t); + } } private void execInternal() throws Exception { @@ -1516,7 +1535,7 @@ public class Coordinator implements CoordInterface { public void cancel() { cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel"); if (queueToken != null) { - queueToken.signalForCancel(); + queueToken.cancel(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 26bb3d95db3..1ec23257749 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -25,7 +25,6 @@ import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; @@ -333,11 +332,11 @@ public final class QeProcessorImpl implements QeProcessor { return -1; } - public TokenState getQueueStatus() { + public String getQueueStatus() { if (coord.getQueueToken() != null) { - return coord.getQueueToken().getTokenState(); + return coord.getQueueToken().getQueueMsg(); } - return null; + return ""; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java new file mode 100644 index 00000000000..480afcde5b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.resource.workloadgroup.QueueToken; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; +import org.apache.doris.thrift.TGlobalResourceUsage; +import org.apache.doris.thrift.TNetworkAddress; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class AdmissionControl extends MasterDaemon { + + public static final Logger LOG = LogManager.getLogger(AdmissionControl.class); + + private volatile boolean isAllBeMemoryEnough = true; + + private double currentMemoryLimit = 0; + + private SystemInfoService clusterInfoService; + + public AdmissionControl(SystemInfoService clusterInfoService) { + super("get-be-resource-usage-thread", Config.get_be_resource_usage_interval_ms); + this.clusterInfoService = clusterInfoService; + } + + private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new ConcurrentLinkedQueue(); + + public void addQueueToken(QueueToken queryQueue) { + queryWaitQueue.offer(queryQueue); + } + + @Override + protected void runAfterCatalogReady() { + getBeMemoryUsage(); + notifyWaitQuery(); + } + + public void getBeMemoryUsage() { + if (Config.query_queue_by_be_used_memory < 0) { + this.isAllBeMemoryEnough = true; + return; + } + Collection<Backend> backends = clusterInfoService.getIdToBackend().values(); + this.currentMemoryLimit = Config.query_queue_by_be_used_memory; + boolean tmpIsAllBeMemoryEnough = true; + for (Backend be : backends) { + if (!be.isAlive()) { + continue; + } + TNetworkAddress address = null; + BackendService.Client client = null; + TGetBeResourceResult result = null; + boolean rpcOk = true; + try { + address = new TNetworkAddress(be.getHost(), be.getBePort()); + client = ClientPool.backendPool.borrowObject(address, 5000); + result = client.getBeResource(new TGetBeResourceRequest()); + } catch (Throwable t) { + rpcOk = false; + LOG.warn("get be {} resource failed, ", be.getHost(), t); + } finally { + try { + if (rpcOk) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } catch (Throwable e) { + LOG.warn("return rpc client failed. related backend[{}]", be.getHost(), + e); + } + } + if (result != null && result.isSetGlobalResourceUsage()) { + TGlobalResourceUsage globalResourceUsage = result.getGlobalResourceUsage(); + if (globalResourceUsage != null && globalResourceUsage.isSetMemLimit() + && globalResourceUsage.isSetMemUsage()) { + long memUsageL = globalResourceUsage.getMemUsage(); + long memLimitL = globalResourceUsage.getMemLimit(); + double memUsage = Double.valueOf(String.valueOf(memUsageL)); + double memLimit = Double.valueOf(String.valueOf(memLimitL)); + double memUsagePercent = memUsage / memLimit; + + if (memUsagePercent > this.currentMemoryLimit) { + tmpIsAllBeMemoryEnough = false; + } + LOG.debug( + "be ip:{}, mem limit:{}, mem usage:{}, mem usage percent:{}, " + + "query queue mem:{}, query wait size:{}", + be.getHost(), memLimitL, memUsageL, memUsagePercent, this.currentMemoryLimit, + this.queryWaitQueue.size()); + } + } + } + this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough; + } + + public void notifyWaitQuery() { + if (!isAllBeMemoryEnough()) { + return; + } + int waitQueryCountSnapshot = queryWaitQueue.size(); + Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator(); + while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) { + QueueToken queueToken = queueTokenIterator.next(); + queueToken.notifyWaitQuery(); + waitQueryCountSnapshot--; + } + } + + public void removeQueueToken(QueueToken queueToken) { + queryWaitQueue.remove(queueToken); + } + + public boolean isAllBeMemoryEnough() { + return isAllBeMemoryEnough; + } + + //TODO(wb): add more resource type + public boolean checkResourceAvailable(QueueToken queueToken) { + if (isAllBeMemoryEnough()) { + return true; + } else { + queueToken.setQueueMsg("WAIT_BE_MEMORY"); + return false; + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 5953edbf66e..ba2d2526f2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -17,14 +17,18 @@ package org.apache.doris.resource.workloadgroup; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; -import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.LinkedList; import java.util.PriorityQueue; +import java.util.Queue; import java.util.concurrent.locks.ReentrantLock; // note(wb) refer java BlockingQueue, but support altering capacity @@ -38,8 +42,6 @@ public class QueryQueue { private int maxConcurrency; private int maxQueueSize; private int queueTimeout; // ms - // running property - private volatile int currentRunningQueryNum; public static final String RUNNING_QUERY_NUM = "running_query_num"; public static final String WAITING_QUERY_NUM = "waiting_query_num"; @@ -48,16 +50,13 @@ public class QueryQueue { private long propVersion; - private PriorityQueue<QueueToken> priorityTokenQueue; + private PriorityQueue<QueueToken> waitingQueryQueue; + private Queue<QueueToken> runningQueryQueue; - int getCurrentRunningQueryNum() { - return currentRunningQueryNum; - } - - int getCurrentWaitingQueryNum() { + Pair<Integer, Integer> getQueryQueueDetail() { try { queueLock.lock(); - return priorityTokenQueue.size(); + return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size()); } finally { queueLock.unlock(); } @@ -89,36 +88,47 @@ public class QueryQueue { this.maxQueueSize = maxQueueSize; this.queueTimeout = queueTimeout; this.propVersion = propVersion; - this.priorityTokenQueue = new PriorityQueue<QueueToken>(); + this.waitingQueryQueue = new PriorityQueue<QueueToken>(); + this.runningQueryQueue = new LinkedList<QueueToken>(); } public String debugString() { return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency - + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout - + ", currentRunningQueryNum=" + currentRunningQueryNum - + ", currentWaitingQueryNum=" + priorityTokenQueue.size(); + + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + ", currentRunningQueryNum=" + + runningQueryQueue.size() + ", currentWaitingQueryNum=" + waitingQueryQueue.size(); } public QueueToken getToken() throws UserException { + AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl(); queueLock.lock(); try { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); } - if (currentRunningQueryNum < maxConcurrency) { - currentRunningQueryNum++; - QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success"); - retToken.setQueueTimeWhenOfferSuccess(); - return retToken; - } - if (priorityTokenQueue.size() >= maxQueueSize) { + QueueToken queueToken = new QueueToken(queueTimeout, this); + + boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency; + boolean isResourceAvailable = admissionControl.checkResourceAvailable(queueToken); + if (!isReachMaxCon && isResourceAvailable) { + runningQueryQueue.offer(queueToken); + queueToken.complete(); + return queueToken; + } else if (waitingQueryQueue.size() >= maxQueueSize) { throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); + } else { + if (isReachMaxCon) { + queueToken.setQueueMsg("WAIT_IN_QUEUE"); + } + queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS); + this.waitingQueryQueue.offer(queueToken); + // if a query is added to wg's queue but not in AdmissionControl's + // queue may be blocked by be memory later, + // then we should put query to AdmissionControl in releaseAndNotify, it's too complicated. + // To simplify the code logic, put all waiting query to AdmissionControl, + // waiting query can be notified when query finish or memory is enough. + admissionControl.addQueueToken(queueToken); } - QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout, - "query wait timeout " + queueTimeout + " ms"); - newQueryToken.setQueueTimeWhenQueueSuccess(); - this.priorityTokenQueue.offer(newQueryToken); - return newQueryToken; + return queueToken; } finally { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); @@ -127,35 +137,36 @@ public class QueryQueue { } } - // If the token is acquired and do work success, then call this method to release it. - public void returnToken(QueueToken token) { + public void notifyWaitQuery() { + releaseAndNotify(null); + } + + public void releaseAndNotify(QueueToken releaseToken) { + AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl(); queueLock.lock(); try { - // If current token is not in ready to run state, then it is still in the queue - // it is not running, just remove it. - if (!token.isReadyToRun()) { - this.priorityTokenQueue.remove(token); - return; - } - currentRunningQueryNum--; - Preconditions.checkArgument(currentRunningQueryNum >= 0); - // If return token and find user changed concurrency num, then maybe need signal - // more tokens. - while (currentRunningQueryNum < maxConcurrency) { - QueueToken nextToken = this.priorityTokenQueue.poll(); - if (nextToken != null) { - if (nextToken.signal()) { - ++currentRunningQueryNum; - } + runningQueryQueue.remove(releaseToken); + waitingQueryQueue.remove(releaseToken); + admissionControl.removeQueueToken(releaseToken); + while (runningQueryQueue.size() < maxConcurrency) { + QueueToken queueToken = waitingQueryQueue.peek(); + if (queueToken == null) { + break; + } + if (admissionControl.checkResourceAvailable(queueToken)) { + queueToken.complete(); + runningQueryQueue.offer(queueToken); + waitingQueryQueue.remove(); + admissionControl.removeQueueToken(queueToken); } else { break; } } } finally { + queueLock.unlock(); if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); } - queueLock.unlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 0a982b81236..748c0c21bda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -17,13 +17,16 @@ package org.apache.doris.resource.workloadgroup; +import org.apache.doris.common.UserException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; // used to mark QueryQueue offer result // if offer failed, then need to cancel query @@ -38,134 +41,79 @@ public class QueueToken implements Comparable<QueueToken> { public enum TokenState { ENQUEUE_SUCCESS, - READY_TO_RUN, - CANCELLED + READY_TO_RUN } static AtomicLong tokenIdGenerator = new AtomicLong(0); private long tokenId = 0; - private TokenState tokenState; + private volatile TokenState tokenState; private long queueWaitTimeout = 0; - private String offerResultDetail; + private long queueStartTime = -1; + private long queueEndTime = -1; - private boolean isTimeout = false; + private volatile String queueMsg = ""; - private final ReentrantLock tokenLock = new ReentrantLock(); - private final Condition tokenCond = tokenLock.newCondition(); + QueryQueue queryQueue = null; - private long queueStartTime = -1; - private long queueEndTime = -1; + // Object is just a placeholder, it's meaningless now + private CompletableFuture<Object> future; - public QueueToken(TokenState tokenState, long queueWaitTimeout, - String offerResultDetail) { + public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) { this.tokenId = tokenIdGenerator.addAndGet(1); - this.tokenState = tokenState; this.queueWaitTimeout = queueWaitTimeout; - this.offerResultDetail = offerResultDetail; + this.queueStartTime = System.currentTimeMillis(); + this.queryQueue = queryQueue; + this.future = new CompletableFuture<>(); } - public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException { - this.tokenLock.lock(); - try { - if (isTimeout) { - return false; - } - if (tokenState == TokenState.READY_TO_RUN) { - return true; - } - // If query timeout is less than queue wait timeout, then should use - // query timeout as wait timeout - long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis; - tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); - if (tokenState == TokenState.CANCELLED) { - this.offerResultDetail = "query is cancelled in queue"; - return false; - } - // If wait timeout and is steal not ready to run, then return false - if (tokenState != TokenState.READY_TO_RUN) { - LOG.warn("wait in queue timeout, timeout = {}", waitTimeout); - isTimeout = true; - return false; - } else { - return true; - } - } catch (Throwable t) { - LOG.warn("meet execption when wait for signal", t); - // If any exception happens, set isTimeout to true and return false - // Then the caller will call returnToken to queue normally. - offerResultDetail = "meet exeption when wait for signal"; - isTimeout = true; - return false; - } finally { - this.tokenLock.unlock(); - this.setQueueTimeWhenQueueEnd(); - } + public void setQueueMsg(String msg) { + this.queueMsg = msg; } - public void signalForCancel() { - this.tokenLock.lock(); - try { - if (this.tokenState == TokenState.ENQUEUE_SUCCESS) { - tokenCond.signal(); - this.tokenState = TokenState.CANCELLED; - } - } catch (Throwable t) { - LOG.warn("error happens when signal for cancel", t); - } finally { - this.tokenLock.unlock(); - } + public void setTokenState(TokenState tokenState) { + this.tokenState = tokenState; } - public boolean signal() { - this.tokenLock.lock(); + public String getQueueMsg() { + return queueMsg; + } + + public void get(String queryId, int queryTimeout) throws UserException { + if (isReadyToRun()) { + return; + } + long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, queryTimeout) : queryTimeout; + waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout; try { - // If current token is not ENQUEUE_SUCCESS, then it maybe has error - // not run it any more. - if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) { - return false; - } - this.tokenState = TokenState.READY_TO_RUN; - tokenCond.signal(); - return true; + future.get(waitTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms "); + } catch (CancellationException e) { + throw new UserException("query is cancelled"); } catch (Throwable t) { - isTimeout = true; - offerResultDetail = "meet exception when signal"; - LOG.warn("failed to signal token", t); - return false; - } finally { - this.tokenLock.unlock(); + String errMsg = String.format("error happens when query {} queue", queryId); + LOG.error(errMsg, t); + throw new RuntimeException(errMsg, t); } } - public String getOfferResultDetail() { - return offerResultDetail; - } - - public boolean isReadyToRun() { - return this.tokenState == TokenState.READY_TO_RUN; - } - - public boolean isCancelled() { - return this.tokenState == TokenState.CANCELLED; - } - - public void setQueueTimeWhenOfferSuccess() { - long currentTime = System.currentTimeMillis(); - this.queueStartTime = currentTime; - this.queueEndTime = currentTime; + public void complete() { + this.queueEndTime = System.currentTimeMillis(); + this.tokenState = TokenState.READY_TO_RUN; + this.setQueueMsg("RUNNING"); + future.complete(null); } - public void setQueueTimeWhenQueueSuccess() { - long currentTime = System.currentTimeMillis(); - this.queueStartTime = currentTime; + public void notifyWaitQuery() { + this.queryQueue.notifyWaitQuery(); } - public void setQueueTimeWhenQueueEnd() { - this.queueEndTime = System.currentTimeMillis(); + public void cancel() { + future.cancel(true); } public long getQueueStartTime() { @@ -176,8 +124,8 @@ public class QueueToken implements Comparable<QueueToken> { return queueEndTime; } - public TokenState getTokenState() { - return tokenState; + public boolean isReadyToRun() { + return tokenState == TokenState.READY_TO_RUN; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 8588e10cf34..2c200e425b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; @@ -431,6 +432,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { List<String> row = new ArrayList<>(); row.add(String.valueOf(id)); row.add(name); + Pair<Integer, Integer> queryQueueDetail = qq != null ? qq.getQueryQueueDetail() : null; // skip id,name,running query,waiting query for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) { String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i); @@ -472,9 +474,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add(val + "%"); } } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) { - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); + row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.first)); } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) { - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); + row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.second)); } else if (TAG.equals(key)) { String val = properties.get(key); if (StringUtils.isEmpty(val)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 7d0f348d18b..9fb37242d84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -51,7 +51,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; -import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -612,14 +611,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell()); } - TokenState tokenState = queryInfo.getQueueStatus(); - if (tokenState == null) { - trow.addToColumnValue(new TCell()); - } else if (tokenState == TokenState.READY_TO_RUN) { - trow.addToColumnValue(new TCell().setStringVal("RUNNING")); - } else { - trow.addToColumnValue(new TCell().setStringVal("QUEUED")); - } + String queueMsg = queryInfo.getQueueStatus(); + trow.addToColumnValue(new TCell().setStringVal(queueMsg)); trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); dataBatch.add(trow); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index d03d3595682..af921cd1821 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -29,6 +29,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; import org.apache.doris.thrift.TNetworkAddress; @@ -148,6 +150,11 @@ public class GenericPoolTest { return null; } + @Override + public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { + return null; + } + @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 12273331634..de73404b886 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -47,6 +47,8 @@ import org.apache.doris.thrift.TExportState; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; @@ -354,6 +356,11 @@ public class MockedBackendFactory { return new TPublishTopicResult(new TStatus(TStatusCode.OK)); } + @Override + public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { + return null; + } + @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { return new TStatus(TStatusCode.OK); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index f80c66dd827..0255d0d61a3 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -242,6 +242,19 @@ struct TPublishTopicResult { 1: required Status.TStatus status } + +struct TGetBeResourceRequest { +} + +struct TGlobalResourceUsage { + 1: optional i64 mem_limit + 2: optional i64 mem_usage +} + +struct TGetBeResourceResult { + 1: optional TGlobalResourceUsage global_resource_usage +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -298,4 +311,6 @@ service BackendService { TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest query_ingest_binlog_request); TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); + + TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request); } diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 70d4a550890..ec34b489e0b 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -325,7 +325,7 @@ suite("test_crud_wlg") { test { sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" - exception "query wait timeout" + exception "query queue timeout" } // test query queue running query/waiting num @@ -520,7 +520,7 @@ suite("test_crud_wlg") { sql "create workload group if not exists bypass_group properties ( 'max_concurrency'='0','max_queue_size'='0','queue_timeout'='0');" sql "set workload_group=bypass_group;" test { - sql "select count(1) from information_schema.active_queries;" + sql "select count(1) from ${table_name};" exception "query waiting queue is full" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org