This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b47348fd0478676be0c60f3d41d1b9e362361eb4 Author: wangbo <wan...@apache.org> AuthorDate: Mon Apr 8 18:09:42 2024 +0800 [Improvement](executor)cancel query when a query is queued (#33339) --- .../main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++++++-- .../doris/resource/workloadgroup/QueueToken.java | 25 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8173e0c4d58..e05eff2c4e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -292,7 +292,7 @@ public class Coordinator implements CoordInterface { private final ExecutionProfile executionProfile; - private QueueToken queueToken = null; + private volatile QueueToken queueToken = null; private QueryQueue queryQueue = null; public ExecutionProfile getExecutionProfile() { @@ -625,7 +625,9 @@ public class Coordinator implements CoordInterface { if (context != null) { if (Config.enable_workload_group) { this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); - if (Config.enable_query_queue && !context.getSessionVariable().getBypassWorkloadGroup()) { + boolean shouldQueue = Config.enable_query_queue && !context.getSessionVariable() + .getBypassWorkloadGroup() && !isQueryCancelled(); + if (shouldQueue) { queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); if (queryQueue == null) { // This logic is actually useless, because when could not find query queue, it will @@ -1440,6 +1442,9 @@ public class Coordinator implements CoordInterface { // if any, as well as all plan fragments on remote nodes. public void cancel() { cancel(Types.PPlanFragmentCancelReason.USER_CANCEL); + if (queueToken != null) { + queueToken.signalForCancel(); + } } @Override @@ -1462,6 +1467,15 @@ public class Coordinator implements CoordInterface { } } + public boolean isQueryCancelled() { + lock(); + try { + return queryStatus.isCancelled(); + } finally { + unlock(); + } + } + private void cancelLatch() { if (instancesDoneLatch != null) { instancesDoneLatch.countDownToZero(new Status()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 6bf44c78828..0a982b81236 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -38,7 +38,8 @@ public class QueueToken implements Comparable<QueueToken> { public enum TokenState { ENQUEUE_SUCCESS, - READY_TO_RUN + READY_TO_RUN, + CANCELLED } static AtomicLong tokenIdGenerator = new AtomicLong(0); @@ -80,6 +81,10 @@ public class QueueToken implements Comparable<QueueToken> { // query timeout as wait timeout long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis; tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); + if (tokenState == TokenState.CANCELLED) { + this.offerResultDetail = "query is cancelled in queue"; + return false; + } // If wait timeout and is steal not ready to run, then return false if (tokenState != TokenState.READY_TO_RUN) { LOG.warn("wait in queue timeout, timeout = {}", waitTimeout); @@ -101,6 +106,20 @@ public class QueueToken implements Comparable<QueueToken> { } } + public void signalForCancel() { + this.tokenLock.lock(); + try { + if (this.tokenState == TokenState.ENQUEUE_SUCCESS) { + tokenCond.signal(); + this.tokenState = TokenState.CANCELLED; + } + } catch (Throwable t) { + LOG.warn("error happens when signal for cancel", t); + } finally { + this.tokenLock.unlock(); + } + } + public boolean signal() { this.tokenLock.lock(); try { @@ -130,6 +149,10 @@ public class QueueToken implements Comparable<QueueToken> { return this.tokenState == TokenState.READY_TO_RUN; } + public boolean isCancelled() { + return this.tokenState == TokenState.CANCELLED; + } + public void setQueueTimeWhenOfferSuccess() { long currentTime = System.currentTimeMillis(); this.queueStartTime = currentTime; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org