This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c586991b05 [fix](cancel) Fix cancel reason on master (#41867)
3c586991b05 is described below

commit 3c586991b05143043a954f32e9d0ff9f95c03315
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed Oct 16 08:38:23 2024 +0800

    [fix](cancel) Fix cancel reason on master (#41867)
    
    pick fix https://github.com/apache/doris/pull/41798 on master.
---
 be/src/runtime/buffer_control_block.cpp                          | 6 +++---
 be/src/runtime/buffer_control_block.h                            | 4 ++--
 be/src/runtime/result_buffer_mgr.cpp                             | 7 ++++---
 be/src/runtime/result_buffer_mgr.h                               | 2 +-
 fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java | 4 ++--
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java    | 2 +-
 fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java | 2 +-
 7 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 61ea5ef080d..6420f533e42 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size, int
 }
 
 BufferControlBlock::~BufferControlBlock() {
-    cancel();
+    cancel(Status::Cancelled("Cancelled"));
 }
 
 Status BufferControlBlock::init() {
@@ -275,12 +275,12 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
     return Status::OK();
 }
 
-void BufferControlBlock::cancel() {
+void BufferControlBlock::cancel(const Status& reason) {
     std::unique_lock<std::mutex> l(_lock);
     _is_cancelled = true;
     _arrow_data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
-        ctx->on_failure(Status::Cancelled("Cancelled"));
+        ctx->on_failure(reason);
     }
     _waiting_rpc.clear();
     _update_dependency();
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 8b45552b2fa..4aff1accbd5 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -85,8 +85,8 @@ public:
     // close buffer block, set _status to exec_status and set _is_close to 
true;
     // called because data has been read or error happened.
     Status close(const TUniqueId& id, Status exec_status);
-    // this is called by RPC, called from coordinator
-    void cancel();
+
+    void cancel(const Status& reason);
 
     [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
 
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index ccbf0c3ff67..86ef1efe63f 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -32,6 +32,7 @@
 
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
+#include "common/status.h"
 #include "runtime/buffer_control_block.h"
 #include "util/doris_metrics.h"
 #include "util/metrics.h"
@@ -144,13 +145,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& 
finst_id,
     return Status::OK();
 }
 
-void ResultBufferMgr::cancel(const TUniqueId& query_id) {
+void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
         auto iter = _buffer_map.find(query_id);
 
         if (_buffer_map.end() != iter) {
-            iter->second->cancel();
+            iter->second->cancel(reason);
             _buffer_map.erase(iter);
         }
     }
@@ -200,7 +201,7 @@ void ResultBufferMgr::cancel_thread() {
 
         // cancel query
         for (const auto& id : query_to_cancel) {
-            cancel(id);
+            cancel(id, Status::TimedOut("Query tiemout"));
         }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
 
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 8bac69c23ac..7534cd5c791 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -71,7 +71,7 @@ public:
     std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& 
query_id);
 
     // cancel
-    void cancel(const TUniqueId& fragment_id);
+    void cancel(const TUniqueId& query_id, const Status& reason);
 
     // cancel one query at a future time.
     void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 579fdeff8e7..783844f12f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -959,7 +959,7 @@ public class ConnectContext {
             closeChannel();
         }
         // Now, cancel running query.
-        cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user"));
+        cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user 
from " + getRemoteHostPortString()));
     }
 
     // kill operation with no protect by timeout.
@@ -1015,7 +1015,7 @@ public class ConnectContext {
             long timeout = getExecTimeout() * 1000L;
             if (delta > timeout) {
                 LOG.warn("kill {} timeout, remote: {}, query timeout: {}, 
query id: {}",
-                        timeoutTag, getRemoteHostPortString(), timeout, 
queryId);
+                        timeoutTag, getRemoteHostPortString(), timeout, 
DebugUtil.printId(queryId));
                 killFlag = true;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index cd083c0450e..5b977a87f1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1190,7 +1190,7 @@ public class Coordinator implements CoordInterface {
                 throw new RpcException(null, copyStatus.getErrorMsg());
             } else {
                 String errMsg = copyStatus.getErrorMsg();
-                LOG.warn("query failed: {}", errMsg);
+                LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), 
errMsg);
                 throw new UserException(errMsg);
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index b591a5d3c6f..ffbc9744ca4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -187,7 +187,7 @@ public class ResultReceiver {
             }
         } catch (TimeoutException e) {
             LOG.warn("fetch result timeout, finstId={}", 
DebugUtil.printId(finstId), e);
-            status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
+            status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
         } finally {
             synchronized (this) {
                 currentThread = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to