This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a7f05caf86 [refactor](status) refactor querycontext and runtime state 
status (#35035)
9a7f05caf86 is described below

commit 9a7f05caf865bfac301ef2c62ba01aa4a3b03801
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon May 20 13:12:53 2024 +0800

    [refactor](status) refactor querycontext and runtime state status (#35035)
    
    
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/common/status.h                             |  6 ++--
 be/src/olap/delta_writer_v2.cpp                    |  2 +-
 .../local_exchange_sink_operator.cpp               |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 10 ++----
 be/src/pipeline/pipeline_fragment_context.h        | 10 ------
 be/src/runtime/fragment_mgr.cpp                    |  8 ++---
 be/src/runtime/group_commit_mgr.cpp                |  4 +--
 be/src/runtime/plan_fragment_executor.cpp          |  5 ++-
 be/src/runtime/query_context.cpp                   | 19 ++++-------
 be/src/runtime/query_context.h                     | 29 ++++-------------
 be/src/runtime/runtime_state.cpp                   | 19 ++---------
 be/src/runtime/runtime_state.h                     | 37 ++++------------------
 be/src/udf/udf.cpp                                 |  2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |  2 +-
 14 files changed, 40 insertions(+), 115 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 6f587d5a28f..f0a02157c22 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -575,7 +575,7 @@ public:
     }
 
     // will copy a new status object to avoid concurrency
-    Status status() {
+    Status status() const {
         std::lock_guard l(mutex_);
         return error_st_;
     }
@@ -583,7 +583,9 @@ public:
 private:
     std::atomic_int16_t error_code_ = 0;
     Status error_st_;
-    std::mutex mutex_;
+    // mutex's lock is not a const method, but we will use this mutex in
+    // some const method, so that it should be mutable.
+    mutable std::mutex mutex_;
 
     AtomicStatus(const AtomicStatus&) = delete;
     void operator=(const AtomicStatus&) = delete;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 34c03fee95e..80978280b92 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -156,7 +156,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block, 
const std::vector<ui
                         { memtable_flush_running_count_limit = 0; });
         while (_memtable_writer->flush_running_count() >= 
memtable_flush_running_count_limit) {
             if (_state->is_cancelled()) {
-                return Status::Cancelled(_state->cancel_reason());
+                return _state->cancel_reason();
             }
             std::this_thread::sleep_for(std::chrono::milliseconds(10));
         }
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 9f6cb670b7d..043b66b2e08 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -54,7 +54,7 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     if (exec_status.ok()) {
         DCHECK(_release_count) << "Do not finish correctly! " << 
debug_string(0)
                                << " state: { cancel = " << 
state->is_cancelled() << ", "
-                               << state->query_status().to_string() << "} 
query ctx: { cancel = "
+                               << state->cancel_reason().to_string() << "} 
query ctx: { cancel = "
                                << state->get_query_ctx()->is_cancelled() << ", 
"
                                << 
state->get_query_ctx()->exec_status().to_string() << "}";
     }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index dfc8ab434a7..8f90e2389ab 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1441,8 +1441,8 @@ Status PipelineFragmentContext::submit() {
         for (auto& t : task) {
             st = scheduler->schedule_task(t.get());
             if (!st) {
-                std::lock_guard<std::mutex> l(_status_lock);
                 cancel(Status::InternalError("submit context to executor 
fail"));
+                std::lock_guard<std::mutex> l(_task_mutex);
                 _total_tasks = submit_tasks;
                 break;
             }
@@ -1539,12 +1539,7 @@ void PipelineFragmentContext::close_a_pipeline() {
 }
 
 Status PipelineFragmentContext::send_report(bool done) {
-    Status exec_status = Status::OK();
-    {
-        std::lock_guard<std::mutex> l(_status_lock);
-        exec_status = _query_ctx->exec_status();
-    }
-
+    Status exec_status = _query_ctx->exec_status();
     // If plan is done successfully, but _is_report_success is false,
     // no need to send report.
     if (!_is_report_success && done && exec_status.ok()) {
@@ -1577,7 +1572,6 @@ Status PipelineFragmentContext::send_report(bool done) {
                              TUniqueId(),
                              -1,
                              _runtime_state.get(),
-                             [this](Status st) { return update_status(st); },
                              [this](const Status& reason) { cancel(reason); }};
 
     return _report_status_cb(
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 383706ee9e5..f4e324b6f53 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -107,14 +107,6 @@ public:
 
     Status send_report(bool);
 
-    Status update_status(Status status) {
-        std::lock_guard<std::mutex> l(_status_lock);
-        if (!status.ok() && _query_ctx->exec_status().ok()) {
-            _query_ctx->set_exec_status(status);
-        }
-        return _query_ctx->exec_status();
-    }
-
     void trigger_report_if_necessary();
     void refresh_next_report_time();
 
@@ -207,8 +199,6 @@ private:
     std::atomic_bool _prepared = false;
     bool _submitted = false;
 
-    std::mutex _status_lock;
-
     Pipelines _pipelines;
     PipelineId _next_pipeline_id = 0;
     std::mutex _task_mutex;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index faefadfc49a..d910dfe97d5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -211,14 +211,14 @@ Status FragmentMgr::trigger_pipeline_context_report(
 // including the final status when execution finishes.
 void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
     DCHECK(req.status.ok() || req.done); // if !status.ok() => done
-    Status exec_status = req.update_fn(req.status);
+    Status exec_status = req.status;
     Status coord_status;
     FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
req.coord_addr,
                                     &coord_status);
     if (!coord_status.ok()) {
         std::stringstream ss;
         UniqueId uid(req.query_id.hi, req.query_id.lo);
-        static_cast<void>(req.update_fn(Status::InternalError(
+        static_cast<void>(req.cancel_fn(Status::InternalError(
                 "query_id: {}, couldn't get a client for {}, reason is {}", 
uid.to_string(),
                 PrintThriftNetworkAddress(req.coord_addr), 
coord_status.to_string())));
         return;
@@ -438,7 +438,6 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
 
             if (!rpc_status.ok()) {
                 // we need to cancel the execution of this fragment
-                static_cast<void>(req.update_fn(rpc_status));
                 req.cancel_fn(rpc_status);
                 return;
             }
@@ -455,7 +454,6 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
         LOG_INFO("Going to cancel instance {} since report exec status got rpc 
failed: {}",
                  print_id(req.fragment_instance_id), rpc_status.to_string());
         // we need to cancel the execution of this fragment
-        static_cast<void>(req.update_fn(rpc_status));
         req.cancel_fn(rpc_status);
     }
 }
@@ -589,7 +587,7 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
                 "timeout or be cancelled. host: {}",
                 BackendOptions::get_localhost());
     }
-    search->second->set_ready_to_execute(false);
+    search->second->set_ready_to_execute(Status::OK());
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 62fbbd37979..7c182814e12 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -57,7 +57,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
         }
     }
     if (UNLIKELY(runtime_state->is_cancelled())) {
-        return Status::Cancelled<false>(runtime_state->cancel_reason());
+        return runtime_state->cancel_reason();
     }
     RETURN_IF_ERROR(status);
     if (block->rows() > 0) {
@@ -134,7 +134,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
     }
     if (runtime_state->is_cancelled()) {
-        auto st = Status::Cancelled<false>(runtime_state->cancel_reason());
+        auto st = runtime_state->cancel_reason();
         _cancel_without_lock(st);
         return st;
     }
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 64bc0cde977..304a633773a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -502,7 +502,6 @@ void PlanFragmentExecutor::send_report(bool done) {
             _fragment_instance_id,
             _backend_num,
             _runtime_state.get(),
-            std::bind(&PlanFragmentExecutor::update_status, this, 
std::placeholders::_1),
             std::bind(&PlanFragmentExecutor::cancel, this, 
std::placeholders::_1)};
     // This will send a report even if we are cancelled.  If the query 
completed correctly
     // but fragments still need to be cancelled (e.g. limit reached), the 
coordinator will
@@ -550,9 +549,9 @@ void PlanFragmentExecutor::cancel(const Status& reason) {
     if (reason.is<ErrorCode::LIMIT_REACH>()) {
         _is_report_on_cancel = false;
     }
-    _runtime_state->set_is_cancelled(reason.to_string());
+    _runtime_state->cancel(reason);
     // To notify wait_for_start()
-    _query_ctx->set_ready_to_execute(true);
+    _query_ctx->set_ready_to_execute(reason);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
     _exec_env->vstream_mgr()->cancel(_fragment_instance_id, reason);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c141fc3b223..5360fbe4e4b 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -182,17 +182,15 @@ QueryContext::~QueryContext() {
     LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), 
mem_tracker_msg);
 }
 
-void QueryContext::set_ready_to_execute(bool is_cancelled) {
+void QueryContext::set_ready_to_execute(Status reason) {
     set_execution_dependency_ready();
     {
         std::lock_guard<std::mutex> l(_start_lock);
-        if (!_is_cancelled) {
-            _is_cancelled = is_cancelled;
-        }
+        _exec_status.update(reason);
         _ready_to_execute = true;
     }
-    if (query_mem_tracker && is_cancelled) {
-        query_mem_tracker->set_is_query_cancelled(is_cancelled);
+    if (query_mem_tracker && !reason.ok()) {
+        query_mem_tracker->set_is_query_cancelled(!reason.ok());
     }
     _start_cond.notify_all();
 }
@@ -211,16 +209,11 @@ void QueryContext::set_execution_dependency_ready() {
 }
 
 void QueryContext::cancel(Status new_status, int fragment_id) {
-    // we must get this wrong status once query ctx's `_is_cancelled` = true.
-    set_exec_status(new_status);
-    // Just for CAS need a left value
-    bool false_cancel = false;
-    if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) {
+    if (!_exec_status.update(new_status)) {
         return;
     }
-    DCHECK(!false_cancel && _is_cancelled);
 
-    set_ready_to_execute(true);
+    set_ready_to_execute(new_status);
     std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
     {
         std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index bc2d8fbee7c..37f17b21c87 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -61,7 +61,6 @@ struct ReportStatusRequest {
     TUniqueId fragment_instance_id;
     int backend_num;
     RuntimeState* runtime_state;
-    std::function<Status(Status)> update_fn;
     std::function<void(const Status&)> cancel_fn;
 };
 
@@ -103,9 +102,9 @@ public:
 
     ThreadPoolToken* get_token() { return _thread_token.get(); }
 
-    void set_ready_to_execute(bool is_cancelled);
+    void set_ready_to_execute(Status reason);
 
-    [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
+    [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
 
     void cancel_all_pipeline_context(const Status& reason);
     Status cancel_pipeline_context(const int fragment_id, const Status& 
reason);
@@ -113,21 +112,9 @@ public:
                               
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
     void cancel(Status new_status, int fragment_id = -1);
 
-    void set_exec_status(Status new_status) {
-        if (new_status.ok()) {
-            return;
-        }
-        std::lock_guard<std::mutex> l(_exec_status_lock);
-        if (!_exec_status.ok()) {
-            return;
-        }
-        _exec_status = new_status;
-    }
+    void set_exec_status(Status new_status) { _exec_status.update(new_status); 
}
 
-    [[nodiscard]] Status exec_status() {
-        std::lock_guard<std::mutex> l(_exec_status_lock);
-        return _exec_status;
-    }
+    [[nodiscard]] Status exec_status() { return _exec_status.status(); }
 
     void set_execution_dependency_ready();
 
@@ -141,10 +128,10 @@ public:
     bool wait_for_start() {
         int wait_time = config::max_fragment_start_wait_time_seconds;
         std::unique_lock<std::mutex> l(_start_lock);
-        while (!_ready_to_execute.load() && !_is_cancelled.load() && 
--wait_time > 0) {
+        while (!_ready_to_execute.load() && _exec_status.ok() && --wait_time > 
0) {
             _start_cond.wait_for(l, std::chrono::seconds(1));
         }
-        return _ready_to_execute.load() && !_is_cancelled.load();
+        return _ready_to_execute.load() && _exec_status.ok();
     }
 
     std::shared_ptr<vectorized::SharedHashTableController> 
get_shared_hash_table_controller() {
@@ -313,7 +300,6 @@ private:
     // Only valid when _need_wait_execution_trigger is set to true in 
PlanFragmentExecutor.
     // And all fragments of this query will start execution when this is set 
to true.
     std::atomic<bool> _ready_to_execute {false};
-    std::atomic<bool> _is_cancelled {false};
 
     void _init_query_mem_tracker();
 
@@ -325,10 +311,9 @@ private:
     std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
     const TQueryOptions _query_options;
 
-    std::mutex _exec_status_lock;
     // All pipeline tasks use the same query context to report status. So we 
need a `_exec_status`
     // to report the real message if failed.
-    Status _exec_status = Status::OK();
+    AtomicStatus _exec_status;
 
     doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
     vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b5974ebd1da..ac560c2c7e1 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -58,7 +58,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(fragment_exec_params.query_id),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0),
           _num_rows_load_total(0),
           _num_rows_load_filtered(0),
@@ -114,7 +113,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
           _unreported_error_idx(0),
           _query_id(query_id),
           _fragment_id(fragment_id),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0),
           _num_rows_load_total(0),
           _num_rows_load_filtered(0),
@@ -152,7 +150,6 @@ 
RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
           _unreported_error_idx(0),
           _query_id(query_id),
           _fragment_id(fragment_id),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0),
           _num_rows_load_total(0),
           _num_rows_load_filtered(0),
@@ -186,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
           _unreported_error_idx(0),
           _query_id(query_id),
           _fragment_id(fragment_id),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0),
           _num_rows_load_total(0),
           _num_rows_load_filtered(0),
@@ -219,7 +215,6 @@ RuntimeState::RuntimeState(const TQueryGlobals& 
query_globals)
           _obj_pool(new ObjectPool()),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0) {
     _query_options.batch_size = DEFAULT_BATCH_SIZE;
     if (query_globals.__isset.time_zone && query_globals.__isset.nano_seconds) 
{
@@ -254,7 +249,6 @@ RuntimeState::RuntimeState()
           _obj_pool(new ObjectPool()),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
-          _is_cancelled(false),
           _per_fragment_instance_idx(0) {
     _query_options.batch_size = DEFAULT_BATCH_SIZE;
     _timezone = TimezoneUtils::default_time_zone;
@@ -358,20 +352,13 @@ void 
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
     }
 }
 
-Status RuntimeState::query_status() {
-    auto st = _query_ctx->exec_status();
-    RETURN_IF_ERROR(st);
-    std::lock_guard<std::mutex> l(_process_status_lock);
-    return _process_status;
-}
-
 bool RuntimeState::is_cancelled() const {
     // Maybe we should just return _is_cancelled.load()
-    return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
+    return !_exec_status.ok() || (_query_ctx && _query_ctx->is_cancelled());
 }
 
-std::string RuntimeState::cancel_reason() const {
-    return _cancel_reason;
+Status RuntimeState::cancel_reason() const {
+    return _exec_status.status();
 }
 
 const int64_t MAX_ERROR_NUM = 50;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f2e2c887571..d5ebac0f3fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -190,7 +190,6 @@ public:
         return _query_options.__isset.mysql_row_binary_format &&
                _query_options.mysql_row_binary_format;
     }
-    Status query_status();
 
     // Appends error to the _error_log if there is space
     bool log_error(const std::string& error);
@@ -206,21 +205,19 @@ public:
     void get_unreported_errors(std::vector<std::string>* new_errors);
 
     [[nodiscard]] bool is_cancelled() const;
-    std::string cancel_reason() const;
-    int codegen_level() const { return _query_options.codegen_level; }
-    void set_is_cancelled(std::string msg) {
-        if (!_is_cancelled.exchange(true)) {
-            _cancel_reason = msg;
+    Status cancel_reason() const;
+    void cancel(const Status& reason) {
+        if (_exec_status.update(reason)) {
             // Create a error status, so that we could print error stack, and
             // we could know which path call cancel.
             LOG(WARNING) << "Task is cancelled, instance: "
                          << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
-                         << ", st = " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+                         << ", st = " << reason;
         } else {
             LOG(WARNING) << "Task is already cancelled, instance: "
                          << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
-                         << ", original cancel msg: " << _cancel_reason
-                         << ", new cancel msg: " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+                         << ", original cancel msg: " << _exec_status.status()
+                         << ", new cancel msg: " << reason;
         }
     }
 
@@ -230,18 +227,6 @@ public:
     void set_be_number(int be_number) { _be_number = be_number; }
     int be_number(void) const { return _be_number; }
 
-    // Sets _process_status with err_msg if no error has been set yet.
-    void set_process_status(const Status& status) {
-        if (status.ok()) {
-            return;
-        }
-        std::lock_guard<std::mutex> l(_process_status_lock);
-        if (!_process_status.ok()) {
-            return;
-        }
-        _process_status = status;
-    }
-
     std::vector<std::string>& output_files() { return _output_files; }
 
     void set_import_label(const std::string& import_label) { _import_label = 
import_label; }
@@ -693,9 +678,7 @@ private:
     TQueryOptions _query_options;
     ExecEnv* _exec_env = nullptr;
 
-    // if true, execution should stop with a CANCELLED status
-    std::atomic<bool> _is_cancelled;
-    std::string _cancel_reason;
+    AtomicStatus _exec_status;
 
     int _per_fragment_instance_idx;
     int _num_per_fragment_instances = 0;
@@ -709,12 +692,6 @@ private:
     // used as send id
     int _be_number;
 
-    // Non-OK if an error has occurred and query execution should abort. Used 
only for
-    // asynchronously reporting such errors (e.g., when a UDF reports an 
error), so this
-    // will not necessarily be set in all error cases.
-    std::mutex _process_status_lock;
-    Status _process_status;
-
     // put here to collect files??
     std::vector<std::string> _output_files;
     std::atomic<int64_t> _num_rows_load_total;      // total rows read from 
source
diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp
index 6dd1b5112d0..3e04208e59a 100644
--- a/be/src/udf/udf.cpp
+++ b/be/src/udf/udf.cpp
@@ -84,7 +84,7 @@ void FunctionContext::set_error(const char* error_msg) {
         ss << "UDF ERROR: " << error_msg;
 
         if (_state != nullptr) {
-            _state->set_process_status(Status::InternalError(ss.str()));
+            _state->cancel(Status::InternalError(ss.str()));
         }
     }
 }
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index e4065a66618..621a9ad1131 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -905,7 +905,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     _close_time_ms = UnixMillis() - _close_time_ms;
 
     if (_cancelled || state->is_cancelled()) {
-        cancel(state->cancel_reason());
+        cancel(state->cancel_reason().to_string());
     }
 
     if (_add_batches_finished) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to