This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d38388453b10a636f1b38f16017fc00e44aa6f64
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Fri Apr 12 14:36:44 2024 +0800

    [fix](timeout) query timeout was not correctly set  (#33444)
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  2 +-
 .../java/org/apache/doris/qe/ResultReceiver.java   | 41 ++++++++++++++--------
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 443c8fb5252..3ae1e5723ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1499,7 +1499,7 @@ public class Coordinator implements CoordInterface {
 
     private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
         if (null != receiver) {
-            receiver.cancel(cancelReason.toString());
+            receiver.cancel(cancelReason);
         }
         if (null != pointExec) {
             pointExec.cancel();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index c473d74b919..275ba0ffd78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -42,8 +42,10 @@ import java.util.concurrent.TimeoutException;
 
 public class ResultReceiver {
     private static final Logger LOG = 
LogManager.getLogger(ResultReceiver.class);
-    private boolean isDone    = false;
-    private boolean isCancel  = false;
+    private boolean isDone = false;
+    // runStatus represents the running status of the ResultReceiver.
+    // If it is not "OK," it indicates cancel.
+    private Status runStatus = new Status();
     private long packetIdx = 0;
     private long timeoutTs = 0;
     private TNetworkAddress address;
@@ -56,6 +58,14 @@ public class ResultReceiver {
 
     int maxMsgSizeOfResultReceiver;
 
+    private void setRunStatus(Status status) {
+        runStatus.setStatus(status);
+    }
+
+    private boolean isCancel() {
+        return !runStatus.ok();
+    }
+
     public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, 
TNetworkAddress address, long timeoutTs,
             int maxMsgSizeOfResultReceiver) {
         this.queryId = 
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
@@ -72,15 +82,14 @@ public class ResultReceiver {
         }
         final RowBatch rowBatch = new RowBatch();
         try {
-            while (!isDone && !isCancel) {
+            while (!isDone && !isCancel()) {
                 InternalService.PFetchDataRequest request = 
InternalService.PFetchDataRequest.newBuilder()
                         .setFinstId(finstId)
                         .setRespInAttachment(false)
                         .build();
 
                 currentThread = Thread.currentThread();
-                fetchDataAsyncFuture
-                        = 
BackendServiceProxy.getInstance().fetchDataAsync(address, request);
+                fetchDataAsyncFuture = 
BackendServiceProxy.getInstance().fetchDataAsync(address, request);
                 InternalService.PFetchDataResult pResult = null;
 
                 while (pResult == null) {
@@ -92,7 +101,7 @@ public class ResultReceiver {
                         pResult = fetchDataAsyncFuture.get(timeoutTs - 
currentTs, TimeUnit.MILLISECONDS);
                     } catch (CancellationException e) {
                         LOG.warn("Future of ResultReceiver of query {} is 
cancelled", DebugUtil.printId(this.queryId));
-                        if (!isCancel) {
+                        if (!isCancel()) {
                             LOG.warn("ResultReceiver is not set to cancelled 
state, this should not happen");
                         } else {
                             status.setStatus(new Status(TStatusCode.CANCELLED, 
this.cancelReason));
@@ -101,7 +110,7 @@ public class ResultReceiver {
                     } catch (TimeoutException e) {
                         LOG.warn("Query {} get result timeout, get result 
duration {} ms",
                                 DebugUtil.printId(this.queryId), (timeoutTs - 
currentTs) / 1000);
-                        isCancel = true;
+                        setRunStatus(Status.TIMEOUT);
                         status.setStatus(Status.TIMEOUT);
                         updateCancelReason("fetch data timeout");
                         return null;
@@ -109,7 +118,7 @@ public class ResultReceiver {
                         // continue to get result
                         LOG.warn("Future of ResultReceiver of query {} got 
interrupted Exception",
                                 DebugUtil.printId(this.queryId), e);
-                        if (isCancel) {
+                        if (isCancel()) {
                             status.setStatus(Status.CANCELLED);
                             return null;
                         }
@@ -148,7 +157,7 @@ public class ResultReceiver {
                     } catch (TException e) {
                         if (e.getMessage().contains("MaxMessageSize reached")) 
{
                             throw new TException(
-                                "MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
+                                    "MaxMessageSize reached, try increase 
max_msg_size_of_result_receiver");
                         } else {
                             throw e;
                         }
@@ -181,8 +190,8 @@ public class ResultReceiver {
             }
         }
 
-        if (isCancel) {
-            status.setStatus(Status.CANCELLED);
+        if ((isCancel())) {
+            status.setStatus(runStatus);
         }
         return rowBatch;
     }
@@ -196,9 +205,13 @@ public class ResultReceiver {
         }
     }
 
-    public void cancel(String reason) {
-        isCancel = true;
-        updateCancelReason(reason);
+    public void cancel(Types.PPlanFragmentCancelReason reason) {
+        if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
+            setRunStatus(Status.TIMEOUT);
+        } else {
+            setRunStatus(Status.CANCELLED);
+        }
+        updateCancelReason(reason.toString());
         synchronized (this) {
             if (currentThread != null) {
                 // TODO(cmy): we cannot interrupt this thread, or we may throw


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to