This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d38388453b10a636f1b38f16017fc00e44aa6f64 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Fri Apr 12 14:36:44 2024 +0800 [fix](timeout) query timeout was not correctly set (#33444) --- .../main/java/org/apache/doris/qe/Coordinator.java | 2 +- .../java/org/apache/doris/qe/ResultReceiver.java | 41 ++++++++++++++-------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 443c8fb5252..3ae1e5723ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1499,7 +1499,7 @@ public class Coordinator implements CoordInterface { private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { if (null != receiver) { - receiver.cancel(cancelReason.toString()); + receiver.cancel(cancelReason); } if (null != pointExec) { pointExec.cancel(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index c473d74b919..275ba0ffd78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -42,8 +42,10 @@ import java.util.concurrent.TimeoutException; public class ResultReceiver { private static final Logger LOG = LogManager.getLogger(ResultReceiver.class); - private boolean isDone = false; - private boolean isCancel = false; + private boolean isDone = false; + // runStatus represents the running status of the ResultReceiver. + // If it is not "OK," it indicates cancel. + private Status runStatus = new Status(); private long packetIdx = 0; private long timeoutTs = 0; private TNetworkAddress address; @@ -56,6 +58,14 @@ public class ResultReceiver { int maxMsgSizeOfResultReceiver; + private void setRunStatus(Status status) { + runStatus.setStatus(status); + } + + private boolean isCancel() { + return !runStatus.ok(); + } + public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs, int maxMsgSizeOfResultReceiver) { this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); @@ -72,15 +82,14 @@ public class ResultReceiver { } final RowBatch rowBatch = new RowBatch(); try { - while (!isDone && !isCancel) { + while (!isDone && !isCancel()) { InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder() .setFinstId(finstId) .setRespInAttachment(false) .build(); currentThread = Thread.currentThread(); - fetchDataAsyncFuture - = BackendServiceProxy.getInstance().fetchDataAsync(address, request); + fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsync(address, request); InternalService.PFetchDataResult pResult = null; while (pResult == null) { @@ -92,7 +101,7 @@ public class ResultReceiver { pResult = fetchDataAsyncFuture.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); } catch (CancellationException e) { LOG.warn("Future of ResultReceiver of query {} is cancelled", DebugUtil.printId(this.queryId)); - if (!isCancel) { + if (!isCancel()) { LOG.warn("ResultReceiver is not set to cancelled state, this should not happen"); } else { status.setStatus(new Status(TStatusCode.CANCELLED, this.cancelReason)); @@ -101,7 +110,7 @@ public class ResultReceiver { } catch (TimeoutException e) { LOG.warn("Query {} get result timeout, get result duration {} ms", DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000); - isCancel = true; + setRunStatus(Status.TIMEOUT); status.setStatus(Status.TIMEOUT); updateCancelReason("fetch data timeout"); return null; @@ -109,7 +118,7 @@ public class ResultReceiver { // continue to get result LOG.warn("Future of ResultReceiver of query {} got interrupted Exception", DebugUtil.printId(this.queryId), e); - if (isCancel) { + if (isCancel()) { status.setStatus(Status.CANCELLED); return null; } @@ -148,7 +157,7 @@ public class ResultReceiver { } catch (TException e) { if (e.getMessage().contains("MaxMessageSize reached")) { throw new TException( - "MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); + "MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); } else { throw e; } @@ -181,8 +190,8 @@ public class ResultReceiver { } } - if (isCancel) { - status.setStatus(Status.CANCELLED); + if ((isCancel())) { + status.setStatus(runStatus); } return rowBatch; } @@ -196,9 +205,13 @@ public class ResultReceiver { } } - public void cancel(String reason) { - isCancel = true; - updateCancelReason(reason); + public void cancel(Types.PPlanFragmentCancelReason reason) { + if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) { + setRunStatus(Status.TIMEOUT); + } else { + setRunStatus(Status.CANCELLED); + } + updateCancelReason(reason.toString()); synchronized (this) { if (currentThread != null) { // TODO(cmy): we cannot interrupt this thread, or we may throw --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org