This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9d397d891c9cccac3409e74853775c5a4729036c
Author: wangbo <wan...@apache.org>
AuthorDate: Fri Jun 14 10:42:12 2024 +0800

    [Refactor]Refacor query queue (#35929)
    
    ## Proposed changes
    Use CompletableFuture to refactor query queue to simplify code for
    better readability.
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  11 +-
 .../doris/resource/workloadgroup/QueryQueue.java   |  42 +++---
 .../doris/resource/workloadgroup/QueueToken.java   | 143 ++++++---------------
 .../workload_manager_p0/test_curd_wlg.groovy       |   2 +-
 4 files changed, 65 insertions(+), 133 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3f379232bb1..be025ef63fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -635,11 +635,8 @@ public class Coordinator implements CoordInterface {
                         throw new UserException("could not find query queue");
                     }
                     queueToken = queryQueue.getToken();
-                    if 
(!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
-                        LOG.error("query (id=" + DebugUtil.printId(queryId) + 
") " + queueToken.getOfferResultDetail());
-                        queryQueue.returnToken(queueToken);
-                        throw new 
UserException(queueToken.getOfferResultDetail());
-                    }
+                    queueToken.get(DebugUtil.printId(queryId),
+                            this.queryOptions.getExecutionTimeout() * 1000);
                 }
             } else {
                 context.setWorkloadGroupName("");
@@ -652,7 +649,7 @@ public class Coordinator implements CoordInterface {
     public void close() {
         if (queryQueue != null && queueToken != null) {
             try {
-                queryQueue.returnToken(queueToken);
+                queryQueue.releaseAndNotify(queueToken);
             } catch (Throwable t) {
                 LOG.error("error happens when coordinator close ", t);
             }
@@ -1221,7 +1218,7 @@ public class Coordinator implements CoordInterface {
     public void cancel() {
         cancel(new Status(TStatusCode.CANCELLED, "query is cancelled by 
user"));
         if (queueToken != null) {
-            queueToken.signalForCancel();
+            queueToken.cancel();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 5953edbf66e..d36042732bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -106,17 +106,16 @@ public class QueryQueue {
                 LOG.info(this.debugString());
             }
             if (currentRunningQueryNum < maxConcurrency) {
+                QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, 
queueTimeout, this);
+                retToken.complete();
                 currentRunningQueryNum++;
-                QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, 
queueTimeout, "offer success");
-                retToken.setQueueTimeWhenOfferSuccess();
                 return retToken;
             }
             if (priorityTokenQueue.size() >= maxQueueSize) {
                 throw new UserException("query waiting queue is full, queue 
length=" + maxQueueSize);
             }
             QueueToken newQueryToken = new 
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
-                    "query wait timeout " + queueTimeout + " ms");
-            newQueryToken.setQueueTimeWhenQueueSuccess();
+                    this);
             this.priorityTokenQueue.offer(newQueryToken);
             return newQueryToken;
         } finally {
@@ -127,35 +126,27 @@ public class QueryQueue {
         }
     }
 
-    // If the token is acquired and do work success, then call this method to 
release it.
-    public void returnToken(QueueToken token) {
+    public void releaseAndNotify(QueueToken releaseToken) {
         queueLock.lock();
         try {
-            // If current token is not in ready to run state, then it is still 
in the queue
-            // it is not running, just remove it.
-            if (!token.isReadyToRun()) {
-                this.priorityTokenQueue.remove(token);
-                return;
+            //NOTE:token's tokenState need to be locked by queueLock
+            if (releaseToken.isReadyToRun()) {
+                currentRunningQueryNum--;
             }
-            currentRunningQueryNum--;
             Preconditions.checkArgument(currentRunningQueryNum >= 0);
-            // If return token and find user changed concurrency num,  then 
maybe need signal
-            // more tokens.
             while (currentRunningQueryNum < maxConcurrency) {
-                QueueToken nextToken = this.priorityTokenQueue.poll();
-                if (nextToken != null) {
-                    if (nextToken.signal()) {
-                        ++currentRunningQueryNum;
-                    }
-                } else {
+                QueueToken queueToken = this.priorityTokenQueue.poll();
+                if (queueToken == null) {
                     break;
                 }
+                queueToken.complete();
+                currentRunningQueryNum++;
             }
         } finally {
+            queueLock.unlock();
             if (LOG.isDebugEnabled()) {
                 LOG.info(this.debugString());
             }
-            queueLock.unlock();
         }
     }
 
@@ -174,4 +165,13 @@ public class QueryQueue {
         }
     }
 
+    public void removeToken(QueueToken queueToken) {
+        queueLock.lock();
+        try {
+            priorityTokenQueue.remove(queueToken);
+        } finally {
+            queueLock.unlock();
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 0a982b81236..e9a17ca1ab4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -17,13 +17,16 @@
 
 package org.apache.doris.resource.workloadgroup;
 
+import org.apache.doris.common.UserException;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 // used to mark QueryQueue offer result
 // if offer failed, then need to cancel query
@@ -38,8 +41,7 @@ public class QueueToken implements Comparable<QueueToken> {
 
     public enum TokenState {
         ENQUEUE_SUCCESS,
-        READY_TO_RUN,
-        CANCELLED
+        READY_TO_RUN
     }
 
     static AtomicLong tokenIdGenerator = new AtomicLong(0);
@@ -50,122 +52,51 @@ public class QueueToken implements Comparable<QueueToken> {
 
     private long queueWaitTimeout = 0;
 
-    private String offerResultDetail;
-
-    private boolean isTimeout = false;
-
-    private final ReentrantLock tokenLock = new ReentrantLock();
-    private final Condition tokenCond = tokenLock.newCondition();
-
     private long queueStartTime = -1;
     private long queueEndTime = -1;
 
-    public QueueToken(TokenState tokenState, long queueWaitTimeout,
-            String offerResultDetail) {
+    // Object is just a placeholder, it's meaningless now
+    private CompletableFuture<Object> future;
+    private QueryQueue queue;
+
+    public QueueToken(TokenState tokenState, long queueWaitTimeout, QueryQueue 
queryQueue) {
         this.tokenId = tokenIdGenerator.addAndGet(1);
         this.tokenState = tokenState;
         this.queueWaitTimeout = queueWaitTimeout;
-        this.offerResultDetail = offerResultDetail;
+        this.queue = queryQueue;
+        this.queueStartTime = System.currentTimeMillis();
+        this.future = new CompletableFuture<>();
     }
 
-    public boolean waitSignal(long queryTimeoutMillis) throws 
InterruptedException {
-        this.tokenLock.lock();
-        try {
-            if (isTimeout) {
-                return false;
-            }
-            if (tokenState == TokenState.READY_TO_RUN) {
-                return true;
-            }
-            // If query timeout is less than queue wait timeout, then should 
use
-            // query timeout as wait timeout
-            long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? 
queueWaitTimeout : queryTimeoutMillis;
-            tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
-            if (tokenState == TokenState.CANCELLED) {
-                this.offerResultDetail = "query is cancelled in queue";
-                return false;
-            }
-            // If wait timeout and is steal not ready to run, then return false
-            if (tokenState != TokenState.READY_TO_RUN) {
-                LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
-                isTimeout = true;
-                return false;
-            } else {
-                return true;
-            }
-        } catch (Throwable t) {
-            LOG.warn("meet execption when wait for signal", t);
-            // If any exception happens, set isTimeout to true and return false
-            // Then the caller will call returnToken to queue normally.
-            offerResultDetail = "meet exeption when wait for signal";
-            isTimeout = true;
-            return false;
-        } finally {
-            this.tokenLock.unlock();
-            this.setQueueTimeWhenQueueEnd();
+    public void get(String queryId, int queryTimeout) throws UserException {
+        if (isReadyToRun()) {
+            return;
         }
-    }
-
-    public void signalForCancel() {
-        this.tokenLock.lock();
+        long waitTimeout = Math.min(queueWaitTimeout, queryTimeout);
         try {
-            if (this.tokenState == TokenState.ENQUEUE_SUCCESS) {
-                tokenCond.signal();
-                this.tokenState = TokenState.CANCELLED;
-            }
+            future.get(waitTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            queue.removeToken(this);
+            throw new UserException("query queue timeout, timeout: " + 
waitTimeout + " ms ");
+        } catch (CancellationException e) {
+            queue.removeToken(this);
+            throw new UserException("query is cancelled");
         } catch (Throwable t) {
-            LOG.warn("error happens when signal for cancel", t);
-        } finally {
-            this.tokenLock.unlock();
+            queue.removeToken(this);
+            String errMsg = String.format("error happens when query {} queue", 
queryId);
+            LOG.error(errMsg, t);
+            throw new RuntimeException(errMsg, t);
         }
     }
 
-    public boolean signal() {
-        this.tokenLock.lock();
-        try {
-            // If current token is not ENQUEUE_SUCCESS, then it maybe has error
-            // not run it any more.
-            if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) {
-                return false;
-            }
-            this.tokenState = TokenState.READY_TO_RUN;
-            tokenCond.signal();
-            return true;
-        } catch (Throwable t) {
-            isTimeout = true;
-            offerResultDetail = "meet exception when signal";
-            LOG.warn("failed to signal token", t);
-            return false;
-        } finally {
-            this.tokenLock.unlock();
-        }
-    }
-
-    public String getOfferResultDetail() {
-        return offerResultDetail;
-    }
-
-    public boolean isReadyToRun() {
-        return this.tokenState == TokenState.READY_TO_RUN;
-    }
-
-    public boolean isCancelled() {
-        return this.tokenState == TokenState.CANCELLED;
-    }
-
-    public void setQueueTimeWhenOfferSuccess() {
-        long currentTime = System.currentTimeMillis();
-        this.queueStartTime = currentTime;
-        this.queueEndTime = currentTime;
-    }
-
-    public void setQueueTimeWhenQueueSuccess() {
-        long currentTime = System.currentTimeMillis();
-        this.queueStartTime = currentTime;
+    public void complete() {
+        this.queueEndTime = System.currentTimeMillis();
+        this.tokenState = TokenState.READY_TO_RUN;
+        future.complete(null);
     }
 
-    public void setQueueTimeWhenQueueEnd() {
-        this.queueEndTime = System.currentTimeMillis();
+    public void cancel() {
+        future.cancel(true);
     }
 
     public long getQueueStartTime() {
@@ -180,6 +111,10 @@ public class QueueToken implements Comparable<QueueToken> {
         return tokenState;
     }
 
+    public boolean isReadyToRun() {
+        return tokenState == TokenState.READY_TO_RUN;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj) {
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 62eb762ff9b..95763631d1e 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -332,7 +332,7 @@ suite("test_crud_wlg") {
     test {
         sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from 
${table_name};"
 
-        exception "query wait timeout"
+        exception "query queue timeout"
     }
 
     // test query queue running query/waiting num


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to