This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 057d1f4d511 [bugfix](wg)refactor query queue for robustness (#37642) 057d1f4d511 is described below commit 057d1f4d5118136d696cb64d24f75a53aa041b7a Author: wangbo <wan...@apache.org> AuthorDate: Thu Jul 11 14:30:46 2024 +0800 [bugfix](wg)refactor query queue for robustness (#37642) ## Proposed changes To simplify QueryQueue's code for robustness, redefine QueryQueue's usage in two steps: 1 use QueryQueue.getToken to get token, then token state maybe running or queued; 2 release QueryToken when coordinator.close,decrement runningQueryNum or remove it from waiting queue; We just need to keep this two step is atomic,then whether QueueToken.get is succ or exception is not important. So this PR remove ```removeToken``` method and just release QueueToken in ```coord.close```. imported: #35929 --- .../src/main/java/org/apache/doris/qe/Coordinator.java | 12 +++++++++--- .../org/apache/doris/resource/workloadgroup/QueryQueue.java | 13 +++---------- .../org/apache/doris/resource/workloadgroup/QueueToken.java | 3 --- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1c742038b50..e17100e3072 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -660,9 +660,7 @@ public class Coordinator implements CoordInterface { @Override public void close() { - for (ScanNode scanNode : scanNodes) { - scanNode.stop(); - } + // NOTE: all close method should be no exception if (queryQueue != null && queueToken != null) { try { queryQueue.releaseAndNotify(queueToken); @@ -670,6 +668,14 @@ public class Coordinator implements CoordInterface { LOG.error("error happens when coordinator close ", t); } } + + try { + for (ScanNode scanNode : scanNodes) { + scanNode.stop(); + } + } catch (Throwable t) { + LOG.error("error happens when scannode stop ", t); + } } private void execInternal() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index d36042732bf..07d5939cc4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -129,9 +129,11 @@ public class QueryQueue { public void releaseAndNotify(QueueToken releaseToken) { queueLock.lock(); try { - //NOTE:token's tokenState need to be locked by queueLock + // NOTE:token's tokenState need to be locked by queueLock if (releaseToken.isReadyToRun()) { currentRunningQueryNum--; + } else { + priorityTokenQueue.remove(releaseToken); } Preconditions.checkArgument(currentRunningQueryNum >= 0); while (currentRunningQueryNum < maxConcurrency) { @@ -165,13 +167,4 @@ public class QueryQueue { } } - public void removeToken(QueueToken queueToken) { - queueLock.lock(); - try { - priorityTokenQueue.remove(queueToken); - } finally { - queueLock.unlock(); - } - } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java index e9a17ca1ab4..20a46a526f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -76,13 +76,10 @@ public class QueueToken implements Comparable<QueueToken> { try { future.get(waitTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - queue.removeToken(this); throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms "); } catch (CancellationException e) { - queue.removeToken(this); throw new UserException("query is cancelled"); } catch (Throwable t) { - queue.removeToken(this); String errMsg = String.format("error happens when query {} queue", queryId); LOG.error(errMsg, t); throw new RuntimeException(errMsg, t); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org