This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 057d1f4d511 [bugfix](wg)refactor query queue for robustness (#37642)
057d1f4d511 is described below

commit 057d1f4d5118136d696cb64d24f75a53aa041b7a
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jul 11 14:30:46 2024 +0800

    [bugfix](wg)refactor query queue for robustness (#37642)
    
    ## Proposed changes
    To simplify QueryQueue's code for robustness, redefine QueryQueue's
    usage in two steps:
    1 use QueryQueue.getToken to get token, then token state maybe running
    or queued;
    2 release QueryToken when coordinator.close,decrement runningQueryNum or
    remove it from waiting queue;
    We just need to keep this two step is atomic,then whether QueueToken.get
    is succ or exception is not important.
    So this PR remove ```removeToken``` method and just release QueueToken
    in ```coord.close```.
    
    
    imported:  #35929
---
 .../src/main/java/org/apache/doris/qe/Coordinator.java      | 12 +++++++++---
 .../org/apache/doris/resource/workloadgroup/QueryQueue.java | 13 +++----------
 .../org/apache/doris/resource/workloadgroup/QueueToken.java |  3 ---
 3 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1c742038b50..e17100e3072 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -660,9 +660,7 @@ public class Coordinator implements CoordInterface {
 
     @Override
     public void close() {
-        for (ScanNode scanNode : scanNodes) {
-            scanNode.stop();
-        }
+        // NOTE: all close method should be no exception
         if (queryQueue != null && queueToken != null) {
             try {
                 queryQueue.releaseAndNotify(queueToken);
@@ -670,6 +668,14 @@ public class Coordinator implements CoordInterface {
                 LOG.error("error happens when coordinator close ", t);
             }
         }
+
+        try {
+            for (ScanNode scanNode : scanNodes) {
+                scanNode.stop();
+            }
+        } catch (Throwable t) {
+            LOG.error("error happens when scannode stop ", t);
+        }
     }
 
     private void execInternal() throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index d36042732bf..07d5939cc4f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -129,9 +129,11 @@ public class QueryQueue {
     public void releaseAndNotify(QueueToken releaseToken) {
         queueLock.lock();
         try {
-            //NOTE:token's tokenState need to be locked by queueLock
+            // NOTE:token's tokenState need to be locked by queueLock
             if (releaseToken.isReadyToRun()) {
                 currentRunningQueryNum--;
+            } else {
+                priorityTokenQueue.remove(releaseToken);
             }
             Preconditions.checkArgument(currentRunningQueryNum >= 0);
             while (currentRunningQueryNum < maxConcurrency) {
@@ -165,13 +167,4 @@ public class QueryQueue {
         }
     }
 
-    public void removeToken(QueueToken queueToken) {
-        queueLock.lock();
-        try {
-            priorityTokenQueue.remove(queueToken);
-        } finally {
-            queueLock.unlock();
-        }
-    }
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index e9a17ca1ab4..20a46a526f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -76,13 +76,10 @@ public class QueueToken implements Comparable<QueueToken> {
         try {
             future.get(waitTimeout, TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
-            queue.removeToken(this);
             throw new UserException("query queue timeout, timeout: " + 
waitTimeout + " ms ");
         } catch (CancellationException e) {
-            queue.removeToken(this);
             throw new UserException("query is cancelled");
         } catch (Throwable t) {
-            queue.removeToken(this);
             String errMsg = String.format("error happens when query {} queue", 
queryId);
             LOG.error(errMsg, t);
             throw new RuntimeException(errMsg, t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to