This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b47348fd0478676be0c60f3d41d1b9e362361eb4
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Apr 8 18:09:42 2024 +0800

    [Improvement](executor)cancel query when a query is queued (#33339)
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++++++++++--
 .../doris/resource/workloadgroup/QueueToken.java   | 25 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8173e0c4d58..e05eff2c4e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -292,7 +292,7 @@ public class Coordinator implements CoordInterface {
 
     private final ExecutionProfile executionProfile;
 
-    private QueueToken queueToken = null;
+    private volatile QueueToken queueToken = null;
     private QueryQueue queryQueue = null;
 
     public ExecutionProfile getExecutionProfile() {
@@ -625,7 +625,9 @@ public class Coordinator implements CoordInterface {
         if (context != null) {
             if (Config.enable_workload_group) {
                 
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
-                if (Config.enable_query_queue && 
!context.getSessionVariable().getBypassWorkloadGroup()) {
+                boolean shouldQueue = Config.enable_query_queue && 
!context.getSessionVariable()
+                        .getBypassWorkloadGroup() && !isQueryCancelled();
+                if (shouldQueue) {
                     queryQueue = 
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
                     if (queryQueue == null) {
                         // This logic is actually useless, because when could 
not find query queue, it will
@@ -1440,6 +1442,9 @@ public class Coordinator implements CoordInterface {
     // if any, as well as all plan fragments on remote nodes.
     public void cancel() {
         cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
+        if (queueToken != null) {
+            queueToken.signalForCancel();
+        }
     }
 
     @Override
@@ -1462,6 +1467,15 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    public boolean isQueryCancelled() {
+        lock();
+        try {
+            return queryStatus.isCancelled();
+        } finally {
+            unlock();
+        }
+    }
+
     private void cancelLatch() {
         if (instancesDoneLatch != null) {
             instancesDoneLatch.countDownToZero(new Status());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 6bf44c78828..0a982b81236 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -38,7 +38,8 @@ public class QueueToken implements Comparable<QueueToken> {
 
     public enum TokenState {
         ENQUEUE_SUCCESS,
-        READY_TO_RUN
+        READY_TO_RUN,
+        CANCELLED
     }
 
     static AtomicLong tokenIdGenerator = new AtomicLong(0);
@@ -80,6 +81,10 @@ public class QueueToken implements Comparable<QueueToken> {
             // query timeout as wait timeout
             long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? 
queueWaitTimeout : queryTimeoutMillis;
             tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
+            if (tokenState == TokenState.CANCELLED) {
+                this.offerResultDetail = "query is cancelled in queue";
+                return false;
+            }
             // If wait timeout and is steal not ready to run, then return false
             if (tokenState != TokenState.READY_TO_RUN) {
                 LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
@@ -101,6 +106,20 @@ public class QueueToken implements Comparable<QueueToken> {
         }
     }
 
+    public void signalForCancel() {
+        this.tokenLock.lock();
+        try {
+            if (this.tokenState == TokenState.ENQUEUE_SUCCESS) {
+                tokenCond.signal();
+                this.tokenState = TokenState.CANCELLED;
+            }
+        } catch (Throwable t) {
+            LOG.warn("error happens when signal for cancel", t);
+        } finally {
+            this.tokenLock.unlock();
+        }
+    }
+
     public boolean signal() {
         this.tokenLock.lock();
         try {
@@ -130,6 +149,10 @@ public class QueueToken implements Comparable<QueueToken> {
         return this.tokenState == TokenState.READY_TO_RUN;
     }
 
+    public boolean isCancelled() {
+        return this.tokenState == TokenState.CANCELLED;
+    }
+
     public void setQueueTimeWhenOfferSuccess() {
         long currentTime = System.currentTimeMillis();
         this.queueStartTime = currentTime;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to