This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0bf13525d7ea7d516f06ab28cbeed946e1fc628f
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Fri Feb 23 01:37:50 2024 +0800

    [opt](cancel) Cancel get result future  immediately if query is cancelled  
(#31228)
---
 .../main/java/org/apache/doris/common/Status.java  |  1 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  7 ++--
 .../java/org/apache/doris/qe/ResultReceiver.java   | 47 ++++++++++++++++++++--
 3 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 5a7c1e9d63d..1961f9b8cc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -24,6 +24,7 @@ import org.apache.doris.thrift.TStatusCode;
 public class Status {
     public static final Status OK = new Status();
     public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, 
"Cancelled");
+    public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, 
"Timeout");
 
     public TStatusCode getErrorCode() {
         return errorCode;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d0322e375a9..fc9665d5a0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1327,7 +1327,8 @@ public class Coordinator implements CoordInterface {
         Status status = new Status();
         resultBatch = receiver.getNext(status);
         if (!status.ok()) {
-            LOG.warn("get next fail, need cancel. query id: {}", 
DebugUtil.printId(queryId));
+            LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
+                    DebugUtil.printId(queryId), status.toString());
         }
 
         updateStatus(status, null /* no instance id */);
@@ -1475,7 +1476,7 @@ public class Coordinator implements CoordInterface {
 
     private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
         if (null != receiver) {
-            receiver.cancel();
+            receiver.cancel(cancelReason.toString());
         }
         if (null != pointExec) {
             pointExec.cancel();
@@ -1487,7 +1488,7 @@ public class Coordinator implements CoordInterface {
 
     private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, 
long backendId) {
         if (null != receiver) {
-            receiver.cancel();
+            receiver.cancel(cancelReason.toString());
         }
         if (null != pointExec) {
             pointExec.cancel();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 1734d3fcfb4..a9e9740963f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +50,8 @@ public class ResultReceiver {
     private Types.PUniqueId finstId;
     private Long backendId;
     private Thread currentThread;
+    private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = 
null;
+    public String cancelReason = "";
 
     public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs) {
         this.queryId = 
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
@@ -71,25 +74,43 @@ public class ResultReceiver {
                         .build();
 
                 currentThread = Thread.currentThread();
-                Future<InternalService.PFetchDataResult> future
+                fetchDataAsyncFuture
                         = 
BackendServiceProxy.getInstance().fetchDataAsync(address, request);
                 InternalService.PFetchDataResult pResult = null;
+
                 while (pResult == null) {
                     long currentTs = System.currentTimeMillis();
                     if (currentTs >= timeoutTs) {
                         throw new TimeoutException("query timeout, query id = 
" + DebugUtil.printId(this.queryId));
                     }
                     try {
-                        pResult = future.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
+                        pResult = fetchDataAsyncFuture.get(timeoutTs - 
currentTs, TimeUnit.MILLISECONDS);
+                    } catch (CancellationException e) {
+                        LOG.warn("Future of ResultReceiver of query {} is 
cancelled", DebugUtil.printId(this.queryId));
+                        if (!isCancel) {
+                            LOG.warn("ResultReceiver is not set to cancelled 
state, this should not happen");
+                        } else {
+                            status.setStatus(new Status(TStatusCode.CANCELLED, 
this.cancelReason));
+                            return null;
+                        }
+                    } catch (TimeoutException e) {
+                        LOG.warn("Query {} get result timeout, get result 
duration {} ms",
+                                DebugUtil.printId(this.queryId), (timeoutTs - 
currentTs) / 1000);
+                        isCancel = true;
+                        status.setStatus(Status.TIMEOUT);
+                        updateCancelReason("fetch data timeout");
+                        return null;
                     } catch (InterruptedException e) {
                         // continue to get result
-                        LOG.info("future get interrupted Exception", e);
+                        LOG.warn("Future of ResultReceiver of query {} got 
interrupted Exception",
+                                DebugUtil.printId(this.queryId), e);
                         if (isCancel) {
                             status.setStatus(Status.CANCELLED);
                             return null;
                         }
                     }
                 }
+
                 TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
                     status.setPstatus(pResult.getStatus());
@@ -150,8 +171,18 @@ public class ResultReceiver {
         return rowBatch;
     }
 
-    public void cancel() {
+    private void updateCancelReason(String reason) {
+        if (this.cancelReason.isEmpty()) {
+            this.cancelReason = reason;
+        } else {
+            LOG.warn("Query {} already has cancel reason: {}, new reason {} 
will be ignored",
+                    DebugUtil.printId(queryId), cancelReason, reason);
+        }
+    }
+
+    public void cancel(String reason) {
         isCancel = true;
+        updateCancelReason(reason);
         synchronized (this) {
             if (currentThread != null) {
                 // TODO(cmy): we cannot interrupt this thread, or we may throw
@@ -160,6 +191,14 @@ public class ResultReceiver {
                 // And user will lost connection to Palo
                 // currentThread.interrupt();
             }
+            if (fetchDataAsyncFuture != null) {
+                if (fetchDataAsyncFuture.cancel(true)) {
+                    LOG.info("ResultReceiver of query {} is cancelled", 
DebugUtil.printId(queryId));
+                } else {
+                    LOG.warn("ResultReceiver of query {} cancel failed, 
typically means the future is finished",
+                            DebugUtil.printId(queryId));
+                }
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to