This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 45cb1b041ce [Improment]Query queued by be memory (#37559) 45cb1b041ce is described below commit 45cb1b041ce4671e058a1d09872e0097584f1b29 Author: wangbo <wan...@apache.org> AuthorDate: Sat Jul 20 15:39:19 2024 +0800 [Improment]Query queued by be memory (#37559) ## Proposed changes Add a mechanism of back pressure based on memory usage. when any BE's mem usage bigger than ```query_queue_by_be_used_memory```, query could queue in FE. --- be/src/service/backend_service.cpp | 10 ++ be/src/service/backend_service.h | 3 + .../main/java/org/apache/doris/common/Config.java | 11 ++ .../main/java/org/apache/doris/catalog/Env.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 20 ++- .../java/org/apache/doris/qe/QeProcessorImpl.java | 7 +- .../apache/doris/resource/AdmissionControl.java | 156 +++++++++++++++++++++ .../doris/resource/workloadgroup/QueryQueue.java | 90 +++++++----- .../doris/resource/workloadgroup/QueueToken.java | 36 +++-- .../resource/workloadgroup/WorkloadGroup.java | 6 +- .../doris/tablefunction/MetadataGenerator.java | 11 +- .../org/apache/doris/common/GenericPoolTest.java | 7 + .../apache/doris/utframe/MockedBackendFactory.java | 7 + gensrc/thrift/BackendService.thrift | 14 ++ .../workload_manager_p0/test_curd_wlg.groovy | 2 +- 15 files changed, 325 insertions(+), 64 deletions(-) diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 1d85a2bca69..4effc225110 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -1297,4 +1297,14 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse response.__set_report_exec_status_params(*report_exec_status_params); } +void BaseBackendService::get_be_resource(TGetBeResourceResult& result, + const TGetBeResourceRequest& request) { + int64_t mem_usage = PerfCounters::get_vm_rss(); + int64_t mem_limit = MemInfo::mem_limit(); + TGlobalResourceUsage global_resource_usage; + global_resource_usage.__set_mem_limit(mem_limit); + global_resource_usage.__set_mem_usage(mem_usage); + result.__set_global_resource_usage(global_resource_usage); +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 4a04f16e853..0ada1bf5393 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -138,6 +138,9 @@ public: void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response, const TGetRealtimeExecStatusRequest& request) override; + void get_be_resource(TGetBeResourceResult& result, + const TGetBeResourceRequest& request) override; + //////////////////////////////////////////////////////////////////////////// // begin cloud backend functions //////////////////////////////////////////////////////////////////////////// diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 39e8e4f1c17..7af0da4f9d6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1767,6 +1767,17 @@ public class Config extends ConfigBase { @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_cpu_hard_limit = false; + @ConfField(mutable = true, description = { + "当BE内存用量大于该值时,查询会进入排队逻辑,默认值为-1,代表该值不生效。取值范围0~1的小数", + "When be memory usage bigger than this value, query could queue, " + + "default value is -1, means this value not work. Decimal value range from 0 to 1"}) + public static double query_queue_by_be_used_memory = -1; + + @ConfField(mutable = true, description = {"基于内存反压场景FE定时拉取BE内存用量的时间间隔", + "In the scenario of memory backpressure, " + + "the time interval for obtaining BE memory usage at regular intervals"}) + public static long get_be_resource_usage_interval_ms = 10000; + @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index c999024d359..49157b19ffb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.QueryCancelWorker; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; @@ -519,6 +520,8 @@ public class Env { private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr; + private AdmissionControl admissionControl; + private QueryStats queryStats; private StatisticsCleaner statisticsCleaner; @@ -784,6 +787,7 @@ public class Env { this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr(); + this.admissionControl = new AdmissionControl(systemInfo); this.queryStats = new QueryStats(); this.loadManagerAdapter = new LoadManagerAdapter(); this.hiveTransactionMgr = new HiveTransactionMgr(); @@ -899,6 +903,10 @@ public class Env { return workloadRuntimeStatusMgr; } + public AdmissionControl getAdmissionControl() { + return admissionControl; + } + public ExternalMetaIdMgr getExternalMetaIdMgr() { return externalMetaIdMgr; } @@ -1819,6 +1827,7 @@ public class Env { workloadGroupMgr.start(); workloadSchedPolicyMgr.start(); workloadRuntimeStatusMgr.start(); + admissionControl.start(); splitSourceManager.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 19915a070f1..17ea6ceb983 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -64,6 +64,7 @@ import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.InternalService; @@ -618,6 +619,22 @@ public class Coordinator implements CoordInterface { return fragmentParams; } + private boolean shouldQueue() { + boolean ret = Config.enable_query_queue && !context.getSessionVariable() + .getBypassWorkloadGroup() && !isQueryCancelled(); + if (!ret) { + return false; + } + // a query with ScanNode need not queue only when all its scan node is SchemaScanNode + for (ScanNode scanNode : this.scanNodes) { + boolean isSchemaScanNode = scanNode instanceof SchemaScanNode; + if (!isSchemaScanNode) { + return true; + } + } + return false; + } + // Initiate asynchronous execution of query. Returns as soon as all plan fragments // have started executing at their respective backends. // 'Request' must contain at least a coordinator plan fragment (ie, can't @@ -629,8 +646,7 @@ public class Coordinator implements CoordInterface { if (context != null) { if (Config.enable_workload_group) { this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); - boolean shouldQueue = Config.enable_query_queue && !context.getSessionVariable() - .getBypassWorkloadGroup() && !isQueryCancelled(); + boolean shouldQueue = this.shouldQueue(); if (shouldQueue) { queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); if (queryQueue == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 38b0ac192d2..4f9f51a951e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -26,7 +26,6 @@ import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryProfile; @@ -374,11 +373,11 @@ public final class QeProcessorImpl implements QeProcessor { return -1; } - public TokenState getQueueStatus() { + public String getQueueStatus() { if (coord.getQueueToken() != null) { - return coord.getQueueToken().getTokenState(); + return coord.getQueueToken().getQueueMsg(); } - return null; + return ""; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java new file mode 100644 index 00000000000..480afcde5b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.resource.workloadgroup.QueueToken; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; +import org.apache.doris.thrift.TGlobalResourceUsage; +import org.apache.doris.thrift.TNetworkAddress; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class AdmissionControl extends MasterDaemon { + + public static final Logger LOG = LogManager.getLogger(AdmissionControl.class); + + private volatile boolean isAllBeMemoryEnough = true; + + private double currentMemoryLimit = 0; + + private SystemInfoService clusterInfoService; + + public AdmissionControl(SystemInfoService clusterInfoService) { + super("get-be-resource-usage-thread", Config.get_be_resource_usage_interval_ms); + this.clusterInfoService = clusterInfoService; + } + + private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new ConcurrentLinkedQueue(); + + public void addQueueToken(QueueToken queryQueue) { + queryWaitQueue.offer(queryQueue); + } + + @Override + protected void runAfterCatalogReady() { + getBeMemoryUsage(); + notifyWaitQuery(); + } + + public void getBeMemoryUsage() { + if (Config.query_queue_by_be_used_memory < 0) { + this.isAllBeMemoryEnough = true; + return; + } + Collection<Backend> backends = clusterInfoService.getIdToBackend().values(); + this.currentMemoryLimit = Config.query_queue_by_be_used_memory; + boolean tmpIsAllBeMemoryEnough = true; + for (Backend be : backends) { + if (!be.isAlive()) { + continue; + } + TNetworkAddress address = null; + BackendService.Client client = null; + TGetBeResourceResult result = null; + boolean rpcOk = true; + try { + address = new TNetworkAddress(be.getHost(), be.getBePort()); + client = ClientPool.backendPool.borrowObject(address, 5000); + result = client.getBeResource(new TGetBeResourceRequest()); + } catch (Throwable t) { + rpcOk = false; + LOG.warn("get be {} resource failed, ", be.getHost(), t); + } finally { + try { + if (rpcOk) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } catch (Throwable e) { + LOG.warn("return rpc client failed. related backend[{}]", be.getHost(), + e); + } + } + if (result != null && result.isSetGlobalResourceUsage()) { + TGlobalResourceUsage globalResourceUsage = result.getGlobalResourceUsage(); + if (globalResourceUsage != null && globalResourceUsage.isSetMemLimit() + && globalResourceUsage.isSetMemUsage()) { + long memUsageL = globalResourceUsage.getMemUsage(); + long memLimitL = globalResourceUsage.getMemLimit(); + double memUsage = Double.valueOf(String.valueOf(memUsageL)); + double memLimit = Double.valueOf(String.valueOf(memLimitL)); + double memUsagePercent = memUsage / memLimit; + + if (memUsagePercent > this.currentMemoryLimit) { + tmpIsAllBeMemoryEnough = false; + } + LOG.debug( + "be ip:{}, mem limit:{}, mem usage:{}, mem usage percent:{}, " + + "query queue mem:{}, query wait size:{}", + be.getHost(), memLimitL, memUsageL, memUsagePercent, this.currentMemoryLimit, + this.queryWaitQueue.size()); + } + } + } + this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough; + } + + public void notifyWaitQuery() { + if (!isAllBeMemoryEnough()) { + return; + } + int waitQueryCountSnapshot = queryWaitQueue.size(); + Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator(); + while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) { + QueueToken queueToken = queueTokenIterator.next(); + queueToken.notifyWaitQuery(); + waitQueryCountSnapshot--; + } + } + + public void removeQueueToken(QueueToken queueToken) { + queryWaitQueue.remove(queueToken); + } + + public boolean isAllBeMemoryEnough() { + return isAllBeMemoryEnough; + } + + //TODO(wb): add more resource type + public boolean checkResourceAvailable(QueueToken queueToken) { + if (isAllBeMemoryEnough()) { + return true; + } else { + queueToken.setQueueMsg("WAIT_BE_MEMORY"); + return false; + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 07d5939cc4f..ba2d2526f2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -17,14 +17,18 @@ package org.apache.doris.resource.workloadgroup; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; -import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.LinkedList; import java.util.PriorityQueue; +import java.util.Queue; import java.util.concurrent.locks.ReentrantLock; // note(wb) refer java BlockingQueue, but support altering capacity @@ -38,8 +42,6 @@ public class QueryQueue { private int maxConcurrency; private int maxQueueSize; private int queueTimeout; // ms - // running property - private volatile int currentRunningQueryNum; public static final String RUNNING_QUERY_NUM = "running_query_num"; public static final String WAITING_QUERY_NUM = "waiting_query_num"; @@ -48,16 +50,13 @@ public class QueryQueue { private long propVersion; - private PriorityQueue<QueueToken> priorityTokenQueue; + private PriorityQueue<QueueToken> waitingQueryQueue; + private Queue<QueueToken> runningQueryQueue; - int getCurrentRunningQueryNum() { - return currentRunningQueryNum; - } - - int getCurrentWaitingQueryNum() { + Pair<Integer, Integer> getQueryQueueDetail() { try { queueLock.lock(); - return priorityTokenQueue.size(); + return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size()); } finally { queueLock.unlock(); } @@ -89,35 +88,47 @@ public class QueryQueue { this.maxQueueSize = maxQueueSize; this.queueTimeout = queueTimeout; this.propVersion = propVersion; - this.priorityTokenQueue = new PriorityQueue<QueueToken>(); + this.waitingQueryQueue = new PriorityQueue<QueueToken>(); + this.runningQueryQueue = new LinkedList<QueueToken>(); } public String debugString() { return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency - + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout - + ", currentRunningQueryNum=" + currentRunningQueryNum - + ", currentWaitingQueryNum=" + priorityTokenQueue.size(); + + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + ", currentRunningQueryNum=" + + runningQueryQueue.size() + ", currentWaitingQueryNum=" + waitingQueryQueue.size(); } public QueueToken getToken() throws UserException { + AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl(); queueLock.lock(); try { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); } - if (currentRunningQueryNum < maxConcurrency) { - QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, this); - retToken.complete(); - currentRunningQueryNum++; - return retToken; - } - if (priorityTokenQueue.size() >= maxQueueSize) { + QueueToken queueToken = new QueueToken(queueTimeout, this); + + boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency; + boolean isResourceAvailable = admissionControl.checkResourceAvailable(queueToken); + if (!isReachMaxCon && isResourceAvailable) { + runningQueryQueue.offer(queueToken); + queueToken.complete(); + return queueToken; + } else if (waitingQueryQueue.size() >= maxQueueSize) { throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); + } else { + if (isReachMaxCon) { + queueToken.setQueueMsg("WAIT_IN_QUEUE"); + } + queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS); + this.waitingQueryQueue.offer(queueToken); + // if a query is added to wg's queue but not in AdmissionControl's + // queue may be blocked by be memory later, + // then we should put query to AdmissionControl in releaseAndNotify, it's too complicated. + // To simplify the code logic, put all waiting query to AdmissionControl, + // waiting query can be notified when query finish or memory is enough. + admissionControl.addQueueToken(queueToken); } - QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout, - this); - this.priorityTokenQueue.offer(newQueryToken); - return newQueryToken; + return queueToken; } finally { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); @@ -126,23 +137,30 @@ public class QueryQueue { } } + public void notifyWaitQuery() { + releaseAndNotify(null); + } + public void releaseAndNotify(QueueToken releaseToken) { + AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl(); queueLock.lock(); try { - // NOTE:token's tokenState need to be locked by queueLock - if (releaseToken.isReadyToRun()) { - currentRunningQueryNum--; - } else { - priorityTokenQueue.remove(releaseToken); - } - Preconditions.checkArgument(currentRunningQueryNum >= 0); - while (currentRunningQueryNum < maxConcurrency) { - QueueToken queueToken = this.priorityTokenQueue.poll(); + runningQueryQueue.remove(releaseToken); + waitingQueryQueue.remove(releaseToken); + admissionControl.removeQueueToken(releaseToken); + while (runningQueryQueue.size() < maxConcurrency) { + QueueToken queueToken = waitingQueryQueue.peek(); if (queueToken == null) { break; } - queueToken.complete(); - currentRunningQueryNum++; + if (admissionControl.checkResourceAvailable(queueToken)) { + queueToken.complete(); + runningQueryQueue.offer(queueToken); + waitingQueryQueue.remove(); + admissionControl.removeQueueToken(queueToken); + } else { + break; + } } } finally { queueLock.unlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 20a46a526f5..748c0c21bda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -48,31 +48,46 @@ public class QueueToken implements Comparable<QueueToken> { private long tokenId = 0; - private TokenState tokenState; + private volatile TokenState tokenState; private long queueWaitTimeout = 0; private long queueStartTime = -1; private long queueEndTime = -1; + private volatile String queueMsg = ""; + + QueryQueue queryQueue = null; + // Object is just a placeholder, it's meaningless now private CompletableFuture<Object> future; - private QueryQueue queue; - public QueueToken(TokenState tokenState, long queueWaitTimeout, QueryQueue queryQueue) { + public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) { this.tokenId = tokenIdGenerator.addAndGet(1); - this.tokenState = tokenState; this.queueWaitTimeout = queueWaitTimeout; - this.queue = queryQueue; this.queueStartTime = System.currentTimeMillis(); + this.queryQueue = queryQueue; this.future = new CompletableFuture<>(); } + public void setQueueMsg(String msg) { + this.queueMsg = msg; + } + + public void setTokenState(TokenState tokenState) { + this.tokenState = tokenState; + } + + public String getQueueMsg() { + return queueMsg; + } + public void get(String queryId, int queryTimeout) throws UserException { if (isReadyToRun()) { return; } - long waitTimeout = Math.min(queueWaitTimeout, queryTimeout); + long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, queryTimeout) : queryTimeout; + waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout; try { future.get(waitTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { @@ -89,9 +104,14 @@ public class QueueToken implements Comparable<QueueToken> { public void complete() { this.queueEndTime = System.currentTimeMillis(); this.tokenState = TokenState.READY_TO_RUN; + this.setQueueMsg("RUNNING"); future.complete(null); } + public void notifyWaitQuery() { + this.queryQueue.notifyWaitQuery(); + } + public void cancel() { future.cancel(true); } @@ -104,10 +124,6 @@ public class QueueToken implements Comparable<QueueToken> { return queueEndTime; } - public TokenState getTokenState() { - return tokenState; - } - public boolean isReadyToRun() { return tokenState == TokenState.READY_TO_RUN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index e5ec2c619b6..d96905af233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -20,6 +20,7 @@ package org.apache.doris.resource.workloadgroup; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.proc.BaseProcResult; @@ -420,6 +421,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { List<String> row = new ArrayList<>(); row.add(String.valueOf(id)); row.add(name); + Pair<Integer, Integer> queryQueueDetail = qq != null ? qq.getQueryQueueDetail() : null; // skip id,name,running query,waiting query for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) { String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i); @@ -461,9 +463,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add(val + "%"); } } else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) { - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); + row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.first)); } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) { - row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); + row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.second)); } else if (TAG.equals(key)) { String val = properties.get(key); if (StringUtils.isEmpty(val)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 91690e23b95..3da993f5190 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -55,7 +55,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; -import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -593,14 +592,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell()); } - TokenState tokenState = queryInfo.getQueueStatus(); - if (tokenState == null) { - trow.addToColumnValue(new TCell()); - } else if (tokenState == TokenState.READY_TO_RUN) { - trow.addToColumnValue(new TCell().setStringVal("RUNNING")); - } else { - trow.addToColumnValue(new TCell().setStringVal("QUEUED")); - } + String queueMsg = queryInfo.getQueueStatus(); + trow.addToColumnValue(new TCell().setStringVal(queueMsg)); trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql())); dataBatch.add(trow); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 31fffe6a332..df6b5d440c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -31,6 +31,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.TGetRealtimeExecStatusRequest; import org.apache.doris.thrift.TGetRealtimeExecStatusResponse; import org.apache.doris.thrift.TGetTopNHotPartitionsRequest; @@ -160,6 +162,11 @@ public class GenericPoolTest { return null; } + @Override + public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { + return null; + } + @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 3934e140f67..f82ca02bbc3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -49,6 +49,8 @@ import org.apache.doris.thrift.TExportState; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.TGetBeResourceRequest; +import org.apache.doris.thrift.TGetBeResourceResult; import org.apache.doris.thrift.TGetRealtimeExecStatusRequest; import org.apache.doris.thrift.TGetRealtimeExecStatusResponse; import org.apache.doris.thrift.TGetTopNHotPartitionsRequest; @@ -366,6 +368,11 @@ public class MockedBackendFactory { return new TPublishTopicResult(new TStatus(TStatusCode.OK)); } + @Override + public TGetBeResourceResult getBeResource(TGetBeResourceRequest request) throws TException { + return null; + } + @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { return new TStatus(TStatusCode.OK); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 26cf411f739..39b49ca5462 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -333,6 +333,18 @@ struct TGetRealtimeExecStatusResponse { 2: optional FrontendService.TReportExecStatusParams report_exec_status_params } +struct TGetBeResourceRequest { +} + +struct TGlobalResourceUsage { + 1: optional i64 mem_limit + 2: optional i64 mem_usage +} + +struct TGetBeResourceResult { + 1: optional TGlobalResourceUsage global_resource_usage +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -401,4 +413,6 @@ service BackendService { TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request); TGetRealtimeExecStatusResponse get_realtime_exec_status(1:TGetRealtimeExecStatusRequest request); + + TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request); } diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 091bbf44c82..f84e913ec35 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -536,7 +536,7 @@ suite("test_crud_wlg") { sql "create workload group if not exists bypass_group properties ( 'max_concurrency'='0','max_queue_size'='0','queue_timeout'='0');" sql "set workload_group=bypass_group;" test { - sql "select count(1) from information_schema.active_queries;" + sql "select count(1) from ${table_name};" exception "query waiting queue is full" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org