This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0bf13525d7ea7d516f06ab28cbeed946e1fc628f Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Fri Feb 23 01:37:50 2024 +0800 [opt](cancel) Cancel get result future immediately if query is cancelled (#31228) --- .../main/java/org/apache/doris/common/Status.java | 1 + .../main/java/org/apache/doris/qe/Coordinator.java | 7 ++-- .../java/org/apache/doris/qe/ResultReceiver.java | 47 ++++++++++++++++++++-- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java index 5a7c1e9d63d..1961f9b8cc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java @@ -24,6 +24,7 @@ import org.apache.doris.thrift.TStatusCode; public class Status { public static final Status OK = new Status(); public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); + public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, "Timeout"); public TStatusCode getErrorCode() { return errorCode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d0322e375a9..fc9665d5a0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1327,7 +1327,8 @@ public class Coordinator implements CoordInterface { Status status = new Status(); resultBatch = receiver.getNext(status); if (!status.ok()) { - LOG.warn("get next fail, need cancel. query id: {}", DebugUtil.printId(queryId)); + LOG.warn("Query {} coordinator get next fail, {}, need cancel.", + DebugUtil.printId(queryId), status.toString()); } updateStatus(status, null /* no instance id */); @@ -1475,7 +1476,7 @@ public class Coordinator implements CoordInterface { private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { if (null != receiver) { - receiver.cancel(); + receiver.cancel(cancelReason.toString()); } if (null != pointExec) { pointExec.cancel(); @@ -1487,7 +1488,7 @@ public class Coordinator implements CoordInterface { private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) { if (null != receiver) { - receiver.cancel(); + receiver.cancel(cancelReason.toString()); } if (null != pointExec) { pointExec.cancel(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 1734d3fcfb4..a9e9740963f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -49,6 +50,8 @@ public class ResultReceiver { private Types.PUniqueId finstId; private Long backendId; private Thread currentThread; + private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null; + public String cancelReason = ""; public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) { this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); @@ -71,25 +74,43 @@ public class ResultReceiver { .build(); currentThread = Thread.currentThread(); - Future<InternalService.PFetchDataResult> future + fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsync(address, request); InternalService.PFetchDataResult pResult = null; + while (pResult == null) { long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { throw new TimeoutException("query timeout, query id = " + DebugUtil.printId(this.queryId)); } try { - pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + pResult = fetchDataAsyncFuture.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + } catch (CancellationException e) { + LOG.warn("Future of ResultReceiver of query {} is cancelled", DebugUtil.printId(this.queryId)); + if (!isCancel) { + LOG.warn("ResultReceiver is not set to cancelled state, this should not happen"); + } else { + status.setStatus(new Status(TStatusCode.CANCELLED, this.cancelReason)); + return null; + } + } catch (TimeoutException e) { + LOG.warn("Query {} get result timeout, get result duration {} ms", + DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000); + isCancel = true; + status.setStatus(Status.TIMEOUT); + updateCancelReason("fetch data timeout"); + return null; } catch (InterruptedException e) { // continue to get result - LOG.info("future get interrupted Exception", e); + LOG.warn("Future of ResultReceiver of query {} got interrupted Exception", + DebugUtil.printId(this.queryId), e); if (isCancel) { status.setStatus(Status.CANCELLED); return null; } } } + TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); if (code != TStatusCode.OK) { status.setPstatus(pResult.getStatus()); @@ -150,8 +171,18 @@ public class ResultReceiver { return rowBatch; } - public void cancel() { + private void updateCancelReason(String reason) { + if (this.cancelReason.isEmpty()) { + this.cancelReason = reason; + } else { + LOG.warn("Query {} already has cancel reason: {}, new reason {} will be ignored", + DebugUtil.printId(queryId), cancelReason, reason); + } + } + + public void cancel(String reason) { isCancel = true; + updateCancelReason(reason); synchronized (this) { if (currentThread != null) { // TODO(cmy): we cannot interrupt this thread, or we may throw @@ -160,6 +191,14 @@ public class ResultReceiver { // And user will lost connection to Palo // currentThread.interrupt(); } + if (fetchDataAsyncFuture != null) { + if (fetchDataAsyncFuture.cancel(true)) { + LOG.info("ResultReceiver of query {} is cancelled", DebugUtil.printId(queryId)); + } else { + LOG.warn("ResultReceiver of query {} cancel failed, typically means the future is finished", + DebugUtil.printId(queryId)); + } + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org