This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3c586991b05 [fix](cancel) Fix cancel reason on master (#41867) 3c586991b05 is described below commit 3c586991b05143043a954f32e9d0ff9f95c03315 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Wed Oct 16 08:38:23 2024 +0800 [fix](cancel) Fix cancel reason on master (#41867) pick fix https://github.com/apache/doris/pull/41798 on master. --- be/src/runtime/buffer_control_block.cpp | 6 +++--- be/src/runtime/buffer_control_block.h | 4 ++-- be/src/runtime/result_buffer_mgr.cpp | 7 ++++--- be/src/runtime/result_buffer_mgr.h | 2 +- fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java | 4 ++-- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java | 2 +- 7 files changed, 14 insertions(+), 13 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 61ea5ef080d..6420f533e42 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int } BufferControlBlock::~BufferControlBlock() { - cancel(); + cancel(Status::Cancelled("Cancelled")); } Status BufferControlBlock::init() { @@ -275,12 +275,12 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { return Status::OK(); } -void BufferControlBlock::cancel() { +void BufferControlBlock::cancel(const Status& reason) { std::unique_lock<std::mutex> l(_lock); _is_cancelled = true; _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { - ctx->on_failure(Status::Cancelled("Cancelled")); + ctx->on_failure(reason); } _waiting_rpc.clear(); _update_dependency(); diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 8b45552b2fa..4aff1accbd5 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -85,8 +85,8 @@ public: // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. Status close(const TUniqueId& id, Status exec_status); - // this is called by RPC, called from coordinator - void cancel(); + + void cancel(const Status& reason); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index ccbf0c3ff67..86ef1efe63f 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -32,6 +32,7 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" +#include "common/status.h" #include "runtime/buffer_control_block.h" #include "util/doris_metrics.h" #include "util/metrics.h" @@ -144,13 +145,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, return Status::OK(); } -void ResultBufferMgr::cancel(const TUniqueId& query_id) { +void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) { { std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); auto iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { - iter->second->cancel(); + iter->second->cancel(reason); _buffer_map.erase(iter); } } @@ -200,7 +201,7 @@ void ResultBufferMgr::cancel_thread() { // cancel query for (const auto& id : query_to_cancel) { - cancel(id); + cancel(id, Status::TimedOut("Query tiemout")); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8bac69c23ac..7534cd5c791 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -71,7 +71,7 @@ public: std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id); // cancel - void cancel(const TUniqueId& fragment_id); + void cancel(const TUniqueId& query_id, const Status& reason); // cancel one query at a future time. void cancel_at_time(time_t cancel_time, const TUniqueId& query_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 579fdeff8e7..783844f12f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -959,7 +959,7 @@ public class ConnectContext { closeChannel(); } // Now, cancel running query. - cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user")); + cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user from " + getRemoteHostPortString())); } // kill operation with no protect by timeout. @@ -1015,7 +1015,7 @@ public class ConnectContext { long timeout = getExecTimeout() * 1000L; if (delta > timeout) { LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}", - timeoutTag, getRemoteHostPortString(), timeout, queryId); + timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId)); killFlag = true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cd083c0450e..5b977a87f1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1190,7 +1190,7 @@ public class Coordinator implements CoordInterface { throw new RpcException(null, copyStatus.getErrorMsg()); } else { String errMsg = copyStatus.getErrorMsg(); - LOG.warn("query failed: {}", errMsg); + LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg); throw new UserException(errMsg); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index b591a5d3c6f..ffbc9744ca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -187,7 +187,7 @@ public class ResultReceiver { } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.updateStatus(TStatusCode.TIMEOUT, "query timeout"); + status.updateStatus(TStatusCode.TIMEOUT, "Query timeout"); } finally { synchronized (this) { currentThread = null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org