This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9d397d891c9cccac3409e74853775c5a4729036c Author: wangbo <wan...@apache.org> AuthorDate: Fri Jun 14 10:42:12 2024 +0800 [Refactor]Refacor query queue (#35929) ## Proposed changes Use CompletableFuture to refactor query queue to simplify code for better readability. --- .../main/java/org/apache/doris/qe/Coordinator.java | 11 +- .../doris/resource/workloadgroup/QueryQueue.java | 42 +++--- .../doris/resource/workloadgroup/QueueToken.java | 143 ++++++--------------- .../workload_manager_p0/test_curd_wlg.groovy | 2 +- 4 files changed, 65 insertions(+), 133 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3f379232bb1..be025ef63fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -635,11 +635,8 @@ public class Coordinator implements CoordInterface { throw new UserException("could not find query queue"); } queueToken = queryQueue.getToken(); - if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) { - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); - queryQueue.returnToken(queueToken); - throw new UserException(queueToken.getOfferResultDetail()); - } + queueToken.get(DebugUtil.printId(queryId), + this.queryOptions.getExecutionTimeout() * 1000); } } else { context.setWorkloadGroupName(""); @@ -652,7 +649,7 @@ public class Coordinator implements CoordInterface { public void close() { if (queryQueue != null && queueToken != null) { try { - queryQueue.returnToken(queueToken); + queryQueue.releaseAndNotify(queueToken); } catch (Throwable t) { LOG.error("error happens when coordinator close ", t); } @@ -1221,7 +1218,7 @@ public class Coordinator implements CoordInterface { public void cancel() { cancel(new Status(TStatusCode.CANCELLED, "query is cancelled by user")); if (queueToken != null) { - queueToken.signalForCancel(); + queueToken.cancel(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 5953edbf66e..d36042732bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -106,17 +106,16 @@ public class QueryQueue { LOG.info(this.debugString()); } if (currentRunningQueryNum < maxConcurrency) { + QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, this); + retToken.complete(); currentRunningQueryNum++; - QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success"); - retToken.setQueueTimeWhenOfferSuccess(); return retToken; } if (priorityTokenQueue.size() >= maxQueueSize) { throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); } QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout, - "query wait timeout " + queueTimeout + " ms"); - newQueryToken.setQueueTimeWhenQueueSuccess(); + this); this.priorityTokenQueue.offer(newQueryToken); return newQueryToken; } finally { @@ -127,35 +126,27 @@ public class QueryQueue { } } - // If the token is acquired and do work success, then call this method to release it. - public void returnToken(QueueToken token) { + public void releaseAndNotify(QueueToken releaseToken) { queueLock.lock(); try { - // If current token is not in ready to run state, then it is still in the queue - // it is not running, just remove it. - if (!token.isReadyToRun()) { - this.priorityTokenQueue.remove(token); - return; + //NOTE:token's tokenState need to be locked by queueLock + if (releaseToken.isReadyToRun()) { + currentRunningQueryNum--; } - currentRunningQueryNum--; Preconditions.checkArgument(currentRunningQueryNum >= 0); - // If return token and find user changed concurrency num, then maybe need signal - // more tokens. while (currentRunningQueryNum < maxConcurrency) { - QueueToken nextToken = this.priorityTokenQueue.poll(); - if (nextToken != null) { - if (nextToken.signal()) { - ++currentRunningQueryNum; - } - } else { + QueueToken queueToken = this.priorityTokenQueue.poll(); + if (queueToken == null) { break; } + queueToken.complete(); + currentRunningQueryNum++; } } finally { + queueLock.unlock(); if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); } - queueLock.unlock(); } } @@ -174,4 +165,13 @@ public class QueryQueue { } } + public void removeToken(QueueToken queueToken) { + queueLock.lock(); + try { + priorityTokenQueue.remove(queueToken); + } finally { + queueLock.unlock(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index 0a982b81236..e9a17ca1ab4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -17,13 +17,16 @@ package org.apache.doris.resource.workloadgroup; +import org.apache.doris.common.UserException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; // used to mark QueryQueue offer result // if offer failed, then need to cancel query @@ -38,8 +41,7 @@ public class QueueToken implements Comparable<QueueToken> { public enum TokenState { ENQUEUE_SUCCESS, - READY_TO_RUN, - CANCELLED + READY_TO_RUN } static AtomicLong tokenIdGenerator = new AtomicLong(0); @@ -50,122 +52,51 @@ public class QueueToken implements Comparable<QueueToken> { private long queueWaitTimeout = 0; - private String offerResultDetail; - - private boolean isTimeout = false; - - private final ReentrantLock tokenLock = new ReentrantLock(); - private final Condition tokenCond = tokenLock.newCondition(); - private long queueStartTime = -1; private long queueEndTime = -1; - public QueueToken(TokenState tokenState, long queueWaitTimeout, - String offerResultDetail) { + // Object is just a placeholder, it's meaningless now + private CompletableFuture<Object> future; + private QueryQueue queue; + + public QueueToken(TokenState tokenState, long queueWaitTimeout, QueryQueue queryQueue) { this.tokenId = tokenIdGenerator.addAndGet(1); this.tokenState = tokenState; this.queueWaitTimeout = queueWaitTimeout; - this.offerResultDetail = offerResultDetail; + this.queue = queryQueue; + this.queueStartTime = System.currentTimeMillis(); + this.future = new CompletableFuture<>(); } - public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException { - this.tokenLock.lock(); - try { - if (isTimeout) { - return false; - } - if (tokenState == TokenState.READY_TO_RUN) { - return true; - } - // If query timeout is less than queue wait timeout, then should use - // query timeout as wait timeout - long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis; - tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); - if (tokenState == TokenState.CANCELLED) { - this.offerResultDetail = "query is cancelled in queue"; - return false; - } - // If wait timeout and is steal not ready to run, then return false - if (tokenState != TokenState.READY_TO_RUN) { - LOG.warn("wait in queue timeout, timeout = {}", waitTimeout); - isTimeout = true; - return false; - } else { - return true; - } - } catch (Throwable t) { - LOG.warn("meet execption when wait for signal", t); - // If any exception happens, set isTimeout to true and return false - // Then the caller will call returnToken to queue normally. - offerResultDetail = "meet exeption when wait for signal"; - isTimeout = true; - return false; - } finally { - this.tokenLock.unlock(); - this.setQueueTimeWhenQueueEnd(); + public void get(String queryId, int queryTimeout) throws UserException { + if (isReadyToRun()) { + return; } - } - - public void signalForCancel() { - this.tokenLock.lock(); + long waitTimeout = Math.min(queueWaitTimeout, queryTimeout); try { - if (this.tokenState == TokenState.ENQUEUE_SUCCESS) { - tokenCond.signal(); - this.tokenState = TokenState.CANCELLED; - } + future.get(waitTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + queue.removeToken(this); + throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms "); + } catch (CancellationException e) { + queue.removeToken(this); + throw new UserException("query is cancelled"); } catch (Throwable t) { - LOG.warn("error happens when signal for cancel", t); - } finally { - this.tokenLock.unlock(); + queue.removeToken(this); + String errMsg = String.format("error happens when query {} queue", queryId); + LOG.error(errMsg, t); + throw new RuntimeException(errMsg, t); } } - public boolean signal() { - this.tokenLock.lock(); - try { - // If current token is not ENQUEUE_SUCCESS, then it maybe has error - // not run it any more. - if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) { - return false; - } - this.tokenState = TokenState.READY_TO_RUN; - tokenCond.signal(); - return true; - } catch (Throwable t) { - isTimeout = true; - offerResultDetail = "meet exception when signal"; - LOG.warn("failed to signal token", t); - return false; - } finally { - this.tokenLock.unlock(); - } - } - - public String getOfferResultDetail() { - return offerResultDetail; - } - - public boolean isReadyToRun() { - return this.tokenState == TokenState.READY_TO_RUN; - } - - public boolean isCancelled() { - return this.tokenState == TokenState.CANCELLED; - } - - public void setQueueTimeWhenOfferSuccess() { - long currentTime = System.currentTimeMillis(); - this.queueStartTime = currentTime; - this.queueEndTime = currentTime; - } - - public void setQueueTimeWhenQueueSuccess() { - long currentTime = System.currentTimeMillis(); - this.queueStartTime = currentTime; + public void complete() { + this.queueEndTime = System.currentTimeMillis(); + this.tokenState = TokenState.READY_TO_RUN; + future.complete(null); } - public void setQueueTimeWhenQueueEnd() { - this.queueEndTime = System.currentTimeMillis(); + public void cancel() { + future.cancel(true); } public long getQueueStartTime() { @@ -180,6 +111,10 @@ public class QueueToken implements Comparable<QueueToken> { return tokenState; } + public boolean isReadyToRun() { + return tokenState == TokenState.READY_TO_RUN; + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 62eb762ff9b..95763631d1e 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -332,7 +332,7 @@ suite("test_crud_wlg") { test { sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from ${table_name};" - exception "query wait timeout" + exception "query queue timeout" } // test query queue running query/waiting num --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org