This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 864a0f9bcb [opt](pipeline) Make pipeline fragment context send_report 
asynchronized (#23142)
864a0f9bcb is described below

commit 864a0f9bcb17495f1a9134bb92e64bc538ac3383
Author: Lijia Liu <liutang...@yeah.net>
AuthorDate: Thu Sep 28 17:55:53 2023 +0800

    [opt](pipeline) Make pipeline fragment context send_report asynchronized 
(#23142)
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   1 +
 be/src/exec/exec_node.cpp                          |   3 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |   3 +
 be/src/pipeline/exec/exchange_sink_buffer.h        |   1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  14 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |   7 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 144 ++++++++++++---------
 be/src/pipeline/pipeline_fragment_context.h        |  34 ++---
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  88 +++----------
 .../pipeline_x/pipeline_x_fragment_context.h       |  12 +-
 be/src/pipeline/task_scheduler.cpp                 |  15 ++-
 be/src/runtime/fragment_mgr.cpp                    |  30 ++++-
 be/src/runtime/fragment_mgr.h                      |   3 +
 be/src/runtime/plan_fragment_executor.cpp          |   5 +-
 be/src/runtime/query_context.h                     |   8 +-
 be/src/runtime/runtime_state.cpp                   |   7 +
 be/src/runtime/runtime_state.h                     |   5 +-
 be/src/service/internal_service.cpp                |   7 +-
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  10 +-
 be/src/vec/runtime/vdata_stream_mgr.h              |   2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  24 +++-
 be/src/vec/runtime/vdata_stream_recvr.h            |   7 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  51 ++++----
 be/src/vec/sink/vdata_stream_sender.h              |  33 ++---
 gensrc/proto/internal_service.proto                |   1 +
 27 files changed, 283 insertions(+), 236 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 643f6eb23c..e8632d47d9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -225,6 +225,8 @@ DEFINE_mBool(compress_rowbatches, "true");
 DEFINE_mBool(rowbatch_align_tuple_offset, "false");
 // interval between profile reports; in seconds
 DEFINE_mInt32(status_report_interval, "5");
+// The pipeline task has a high concurrency, therefore reducing its report 
frequency
+DEFINE_mInt32(pipeline_status_report_interval, "10");
 // if true, each disk will have a separate thread pool for scanner
 DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
 // the timeout of a work thread to wait the blocking priority queue to get a 
task
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 349a8a93a9..1189169187 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -271,6 +271,7 @@ DECLARE_mBool(compress_rowbatches);
 DECLARE_mBool(rowbatch_align_tuple_offset);
 // interval between profile reports; in seconds
 DECLARE_mInt32(status_report_interval);
+DECLARE_mInt32(pipeline_status_report_interval);
 // if true, each disk will have a separate thread pool for scanner
 DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
 // the timeout of a work thread to wait the blocking priority queue to get a 
task
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c21ae2e5f5..f3870c6346 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -198,7 +198,8 @@ Status ExecNode::close(RuntimeState* state) {
                   << " already closed";
         return Status::OK();
     }
-    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed";
+    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << ", "
+              << " id=" << _id << " type=" << print_plan_node_type(_type) << " 
closed";
     _is_closed = true;
 
     Status result;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index f49fea2c03..85e37ee4a9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -225,6 +225,9 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         if (request.block) {
             brpc_request->set_allocated_block(request.block.get());
         }
+        if (!request.exec_status.ok()) {
+            
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
+        }
         auto* closure = request.channel->get_closure(id, request.eos, nullptr);
 
         _instance_to_rpc_ctx[id]._closure = closure;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 6086b36c22..d5e530af5c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -94,6 +94,7 @@ struct TransmitInfo {
     vectorized::PipChannel<Parent>* channel;
     std::unique_ptr<PBlock> block;
     bool eos;
+    Status exec_status;
 };
 
 template <typename Parent>
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 1b00293258..5d339a7318 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -278,7 +278,8 @@ template <typename ChannelPtrType>
 void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel,
                                                 Status st) {
     channel->set_receiver_eof(st);
-    channel->close(state);
+    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
+    channel->close(state, Status::OK());
 }
 
 Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block,
@@ -337,8 +338,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                 status = channel->send_local_block(&cur_block);
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                                status = channel->send_block(block_holder, 
&sent,
-                                                             source_state == 
SourceState::FINISHED);
+                                status = channel->send_broadcast_block(
+                                        block_holder, &sent, source_state == 
SourceState::FINISHED);
                             }
                             HANDLE_CHANNEL_STATUS(state, channel, status);
                         }
@@ -365,8 +366,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 RETURN_IF_ERROR(local_state._serializer.serialize_block(
                         block, current_channel->ch_cur_pb_block()));
-                auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(),
-                                                          source_state == 
SourceState::FINISHED);
+                auto status = current_channel->send_remote_block(
+                        current_channel->ch_cur_pb_block(), source_state == 
SourceState::FINISHED);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
             }
@@ -520,8 +521,9 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* 
state, Status exec_status)
     CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
     local_state._serializer.reset_block();
     Status final_st = Status::OK();
+    Status final_status = exec_status;
     for (int i = 0; i < local_state.channels.size(); ++i) {
-        Status st = local_state.channels[i]->close(state);
+        Status st = local_state.channels[i]->close(state, exec_status);
         if (!st.ok() && final_st.ok()) {
             final_st = st;
         }
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 26f8fed731..2e375d6908 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -234,8 +234,8 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
                                     status = 
channel->send_local_block(&cur_block);
                                 } else {
                                     
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                                    status =
-                                            
channel->send_block(_block_holder.get(), nullptr, true);
+                                    status = 
channel->send_broadcast_block(_block_holder.get(),
+                                                                           
nullptr, true);
                                 }
                                 HANDLE_CHANNEL_STATUS(state, channel, status);
                             }
@@ -256,7 +256,8 @@ template <typename ChannelPtrType>
 void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel,
                                                    Status st) {
     channel->set_receiver_eof(st);
-    channel->close(state);
+    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
+    channel->close(state, Status::OK());
 }
 
 Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 424d75b1d9..bba54559fe 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -181,7 +181,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     COUNTER_UPDATE(profile()->total_time_counter(),
                    _cancel_dependency->write_watcher_elapse_time());
     SCOPED_TIMER(profile()->total_time_counter());
-    Status final_status = Status::OK();
+    Status final_status = exec_status;
     if (_writer) {
         // close the writer
         Status st = _writer->close();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 65cd718949..7d4b091cda 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -28,6 +28,9 @@
 #include <pthread.h>
 #include <stdlib.h>
 // IWYU pragma: no_include <bits/chrono.h>
+#include <fmt/format.h>
+#include <fmt/ranges.h>
+
 #include <chrono> // IWYU pragma: keep
 #include <map>
 #include <ostream>
@@ -125,14 +128,12 @@ PipelineFragmentContext::PipelineFragmentContext(
           _exec_env(exec_env),
           _query_ctx(std::move(query_ctx)),
           _call_back(call_back),
-          _report_thread_active(false),
-          _report_status_cb(report_status_cb),
           _is_report_on_cancel(true),
+          _report_status_cb(report_status_cb),
           _group_commit(group_commit) {
     if (_query_ctx->get_task_group()) {
         _task_group_entity = _query_ctx->get_task_group()->task_entity();
     }
-    _report_thread_future = _report_thread_promise.get_future();
     _fragment_watcher.start();
 }
 
@@ -146,16 +147,29 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     } else {
         _call_back(_runtime_state.get(), &st);
     }
-    DCHECK(!_report_thread_active);
 }
 
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
+    LOG_INFO("PipelineFragmentContext::cancel")
+            .tag("query_id", print_id(_query_ctx->query_id()))
+            .tag("fragment_id", _fragment_id)
+            .tag("instance_id", 
print_id(_runtime_state->fragment_instance_id()))
+            .tag("reason", PPlanFragmentCancelReason_Name(reason))
+            .tag("message", msg);
+
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
-        LOG(WARNING) << "PipelineFragmentContext "
-                     << PrintInstanceStandardInfo(_query_id, _fragment_id, 
_fragment_instance_id)
-                     << " is canceled, cancel message: " << msg;
+        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+            LOG(WARNING) << "PipelineFragmentContext "
+                         << PrintInstanceStandardInfo(_query_id, _fragment_id,
+                                                      _fragment_instance_id)
+                         << " is canceled, cancel message: " << msg;
+
+        } else {
+            _set_is_report_on_cancel(false); // TODO bug llj fix this not 
projected by lock
+        }
 
+        _runtime_state->set_process_status(_query_ctx->exec_status());
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
         auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
@@ -164,7 +178,8 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         }
 
         // must close stream_mgr to avoid dead lock in Exchange Node
-        _exec_env->vstream_mgr()->cancel(_fragment_instance_id);
+        // TODO bug llj  fix this other instance will not cancel
+        _exec_env->vstream_mgr()->cancel(_fragment_instance_id, 
Status::Cancelled(msg));
         // Cancel the result queue manager used by spark doris connector
         // TODO pipeline incomp
         // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
@@ -199,6 +214,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     LOG_INFO("PipelineFragmentContext::prepare")
             .tag("query_id", print_id(_query_id))
+            .tag("fragment_id", _fragment_id)
             .tag("instance_id", print_id(local_params.fragment_instance_id))
             .tag("backend_num", local_params.backend_num)
             .tag("pthread_id", (uintptr_t)pthread_self());
@@ -311,6 +327,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), 
true, nullptr);
     _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, 
nullptr);
 
+    _init_next_report_time();
+
     _prepared = true;
     return Status::OK();
 }
@@ -344,54 +362,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
-void PipelineFragmentContext::_stop_report_thread() {
-    if (!_report_thread_active) {
-        return;
+void PipelineFragmentContext::_init_next_report_time() {
+    auto interval_s = config::pipeline_status_report_interval;
+    if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > 
interval_s) {
+        std::vector<string> ins_ids;
+        instance_ids(ins_ids);
+        VLOG_FILE << "enable period report: instance_id="
+                  << fmt::format("{}", fmt::join(ins_ids, ", "));
+        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * 
NANOS_PER_SEC;
+        // We don't want to wait longer than it takes to run the entire 
fragment.
+        _previous_report_time =
+                MonotonicNanos() + report_fragment_offset - 
(uint64_t)(interval_s)*NANOS_PER_SEC;
+        _disable_period_report = false;
     }
+}
 
-    _report_thread_active = false;
-
-    _stop_report_thread_cv.notify_one();
-    // Wait infinitly to ensure that the report task is finished and the this 
variable
-    // is not used in report thread.
-    _report_thread_future.wait();
+void PipelineFragmentContext::refresh_next_report_time() {
+    auto disable = _disable_period_report.load(std::memory_order_acquire);
+    DCHECK(disable == true);
+    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
+    _disable_period_report.compare_exchange_strong(disable, false);
 }
 
-void PipelineFragmentContext::report_profile() {
-    SCOPED_ATTACH_TASK(_runtime_state.get());
-    VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
-
-    _report_thread_active = true;
-
-    std::unique_lock<std::mutex> l(_report_thread_lock);
-    // tell Open() that we started
-    _report_thread_started_cv.notify_one();
-
-    // Jitter the reporting time of remote fragments by a random amount between
-    // 0 and the report_interval.  This way, the coordinator doesn't get all 
the
-    // updates at once so its better for contention as well as smoother 
progress
-    // reporting.
-    int report_fragment_offset = rand() % config::status_report_interval;
-    // We don't want to wait longer than it takes to run the entire fragment.
-    _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(report_fragment_offset));
-    while (_report_thread_active) {
-        if (config::status_report_interval > 0) {
-            // wait_for can return because the timeout occurred or the 
condition variable
-            // was signaled.  We can't rely on its return value to distinguish 
between the
-            // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
-            // the lock, the condition variable was signaled).  Instead, we 
will use an external
-            // flag, _report_thread_active, to coordinate this.
-            _stop_report_thread_cv.wait_for(l,
-                                            
std::chrono::seconds(config::status_report_interval));
-        } else {
-            LOG(WARNING) << "config::status_report_interval is equal to or 
less than zero, exiting "
-                            "reporting thread.";
-            break;
+void PipelineFragmentContext::trigger_report_if_necessary() {
+    if (!_is_report_success) {
+        return;
+    }
+    auto disable = _disable_period_report.load(std::memory_order_acquire);
+    if (disable) {
+        return;
+    }
+    int32_t interval_s = config::pipeline_status_report_interval;
+    if (interval_s <= 0) {
+        LOG(WARNING)
+                << "config::status_report_interval is equal to or less than 
zero, do not trigger "
+                   "report.";
+    }
+    uint64_t next_report_time = 
_previous_report_time.load(std::memory_order_acquire) +
+                                (uint64_t)(interval_s)*NANOS_PER_SEC;
+    if (MonotonicNanos() > next_report_time) {
+        if (!_disable_period_report.compare_exchange_strong(disable, true,
+                                                            
std::memory_order_acq_rel)) {
+            return;
         }
-
         if (VLOG_FILE_IS_ON) {
-            VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : 
" ")
-                      << "profile for instance " << 
_runtime_state->fragment_instance_id();
+            std::vector<string> ins_ids;
+            instance_ids(ins_ids);
+            VLOG_FILE << "Reporting "
+                      << "profile for query_id " << print_id(_query_id)
+                      << ", instance ids: " << fmt::format("{}", 
fmt::join(ins_ids, ", "));
+
             std::stringstream ss;
             _runtime_state->runtime_profile()->compute_time_in_profile();
             _runtime_state->runtime_profile()->pretty_print(&ss);
@@ -401,15 +421,13 @@ void PipelineFragmentContext::report_profile() {
             }
             VLOG_FILE << ss.str();
         }
-
-        if (!_report_thread_active) {
-            break;
+        auto st = send_report(false);
+        if (!st.ok()) {
+            disable = true;
+            _disable_period_report.compare_exchange_strong(disable, false,
+                                                           
std::memory_order_acq_rel);
         }
-
-        send_report(false);
     }
-
-    VLOG_FILE << "exiting reporting thread: instance_id=" << 
_runtime_state->fragment_instance_id();
 }
 
 // TODO: use virtual function to do abstruct
@@ -815,7 +833,6 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
 void PipelineFragmentContext::_close_action() {
     
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     send_report(true);
-    _stop_report_thread();
     // all submitted tasks done
     _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
 }
@@ -828,7 +845,7 @@ void PipelineFragmentContext::close_a_pipeline() {
     }
 }
 
-void PipelineFragmentContext::send_report(bool done) {
+Status PipelineFragmentContext::send_report(bool done) {
     Status exec_status = Status::OK();
     {
         std::lock_guard<std::mutex> l(_status_lock);
@@ -838,7 +855,7 @@ void PipelineFragmentContext::send_report(bool done) {
     // If plan is done successfully, but _is_report_success is false,
     // no need to send report.
     if (!_is_report_success && done && exec_status.ok()) {
-        return;
+        return Status::NeedSendAgain("");
     }
 
     // If both _is_report_success and _is_report_on_cancel are false,
@@ -846,10 +863,10 @@ void PipelineFragmentContext::send_report(bool done) {
     // This may happen when the query limit reached and
     // a internal cancellation being processed
     if (!_is_report_success && !_is_report_on_cancel) {
-        return;
+        return Status::NeedSendAgain("");
     }
 
-    _report_status_cb(
+    return _report_status_cb(
             {false,
              exec_status,
              {},
@@ -864,7 +881,8 @@ void PipelineFragmentContext::send_report(bool done) {
              _runtime_state.get(),
              std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
              std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
-                       std::placeholders::_2)});
+                       std::placeholders::_2)},
+            shared_from_this());
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 8e80a73143..80c38880bf 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -58,7 +58,8 @@ public:
     // Note: this does not take a const RuntimeProfile&, because it might need 
to call
     // functions like PrettyPrint() or to_thrift(), neither of which is const
     // because they take locks.
-    using report_status_callback = std::function<void(const 
ReportStatusRequest)>;
+    using report_status_callback = std::function<Status(
+            const ReportStatusRequest, 
std::shared_ptr<pipeline::PipelineFragmentContext>&&)>;
     PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& 
instance_id,
                             const int fragment_id, int backend_num,
                             std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
@@ -118,9 +119,7 @@ public:
     virtual void add_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
 
-    virtual void send_report(bool);
-
-    virtual void report_profile();
+    virtual Status send_report(bool);
 
     Status update_status(Status status) {
         std::lock_guard<std::mutex> l(_status_lock);
@@ -133,12 +132,18 @@ public:
     [[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* 
get_task_group_entity() const {
         return _task_group_entity;
     }
+    void trigger_report_if_necessary();
 
     bool is_group_commit() { return _group_commit; }
     virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(1);
         ins_ids[0] = _fragment_instance_id;
     }
+    virtual void instance_ids(std::vector<string>& ins_ids) const {
+        ins_ids.resize(1);
+        ins_ids[0] = print_id(_fragment_instance_id);
+    }
+    void refresh_next_report_time();
 
 protected:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
@@ -147,7 +152,7 @@ protected:
     template <bool is_intersect>
     Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
     virtual void _close_action();
-    void _stop_report_thread();
+    void _init_next_report_time();
     void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
 
     // Id of this query
@@ -200,22 +205,17 @@ protected:
     std::function<void(RuntimeState*, Status*)> _call_back;
     std::once_flag _close_once_flag;
 
-    std::condition_variable _report_thread_started_cv;
-    // true if we started the thread
-    bool _report_thread_active;
-    // profile reporting-related
-    report_status_callback _report_status_cb;
-    std::promise<bool> _report_thread_promise;
-    std::future<bool> _report_thread_future;
-    std::mutex _report_thread_lock;
-
-    // Indicates that profile reporting thread should stop.
-    // Tied to _report_thread_lock.
-    std::condition_variable _stop_report_thread_cv;
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
     bool _is_report_on_cancel;
 
+    // 0 indicates reporting is in progress or not required
+    std::atomic_bool _disable_period_report = true;
+    std::atomic_uint64_t _previous_report_time = 0;
+
+    // profile reporting-related
+    report_status_callback _report_status_cb;
+
 private:
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
     bool _group_commit;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 64ea086e9a..9369b26ba7 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -119,13 +119,22 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
         _call_back(nullptr, &st);
     }
     _runtime_state.reset();
-    DCHECK(!_report_thread_active);
 }
 
 void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                       const std::string& msg) {
+    LOG_INFO("PipelineXFragmentContext::cancel")
+            .tag("query_id", print_id(_query_id))
+            .tag("fragment_id", _fragment_id)
+            .tag("reason", reason)
+            .tag("error message", msg);
     if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
-        LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
+        if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
+            FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext 
cancel instance: "
+                                                << 
print_id(runtime_state->fragment_instance_id());)
+        } else {
+            _set_is_report_on_cancel(false); // TODO bug llj
+        }
         // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
         auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
@@ -156,7 +165,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     }
 
     LOG_INFO("PipelineXFragmentContext::prepare")
-            .tag("query_id", _query_id)
+            .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
             .tag("pthread_id", (uintptr_t)pthread_self());
 
@@ -215,6 +224,8 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     // 5. Build pipeline tasks and initialize local state.
     RETURN_IF_ERROR(_build_pipeline_tasks(request));
 
+    _init_next_report_time();
+
     _prepared = true;
     return Status::OK();
 }
@@ -428,64 +439,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
-void PipelineXFragmentContext::report_profile() {
-    FOR_EACH_RUNTIME_STATE(
-            SCOPED_ATTACH_TASK(runtime_state.get());
-            VLOG_FILE << "report_profile(): instance_id=" << 
runtime_state->fragment_instance_id();
-
-            _report_thread_active = true;
-
-            std::unique_lock<std::mutex> l(_report_thread_lock);
-            // tell Open() that we started
-            _report_thread_started_cv.notify_one();
-
-            // Jitter the reporting time of remote fragments by a random 
amount between
-            // 0 and the report_interval.  This way, the coordinator doesn't 
get all the
-            // updates at once so its better for contention as well as 
smoother progress
-            // reporting.
-            int report_fragment_offset = rand() % 
config::status_report_interval;
-            // We don't want to wait longer than it takes to run the entire 
fragment.
-            _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(report_fragment_offset));
-            while (_report_thread_active) {
-                if (config::status_report_interval > 0) {
-                    // wait_for can return because the timeout occurred or the 
condition variable
-                    // was signaled.  We can't rely on its return value to 
distinguish between the
-                    // two cases (e.g. there is a race here where the wait 
timed out but before grabbing
-                    // the lock, the condition variable was signaled).  
Instead, we will use an external
-                    // flag, _report_thread_active, to coordinate this.
-                    _stop_report_thread_cv.wait_for(
-                            l, 
std::chrono::seconds(config::status_report_interval));
-                } else {
-                    LOG(WARNING) << "config::status_report_interval is equal 
to or less than zero, "
-                                    "exiting "
-                                    "reporting thread.";
-                    break;
-                }
-
-                if (VLOG_FILE_IS_ON) {
-                    VLOG_FILE << "Reporting " << (!_report_thread_active ? 
"final " : " ")
-                              << "profile for instance " << 
runtime_state->fragment_instance_id();
-                    std::stringstream ss;
-                    
runtime_state->runtime_profile()->compute_time_in_profile();
-                    runtime_state->runtime_profile()->pretty_print(&ss);
-                    if (runtime_state->load_channel_profile()) {
-                        // 
runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load 
channel profile add timer
-                        
runtime_state->load_channel_profile()->pretty_print(&ss);
-                    }
-                    VLOG_FILE << ss.str();
-                }
-
-                if (!_report_thread_active) {
-                    break;
-                }
-
-                send_report(false);
-            }
-
-            VLOG_FILE
-            << "exiting reporting thread: instance_id=" << 
runtime_state->fragment_instance_id();)
-}
-
 Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool,
                                                   const 
doris::TPipelineFragmentParams& request,
                                                   const DescriptorTbl& descs, 
OperatorXPtr* root,
@@ -864,12 +817,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
 void PipelineXFragmentContext::_close_action() {
     
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     send_report(true);
-    _stop_report_thread();
     // all submitted tasks done
     _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
 }
 
-void PipelineXFragmentContext::send_report(bool done) {
+Status PipelineXFragmentContext::send_report(bool done) {
     Status exec_status = Status::OK();
     {
         std::lock_guard<std::mutex> l(_status_lock);
@@ -879,7 +831,7 @@ void PipelineXFragmentContext::send_report(bool done) {
     // If plan is done successfully, but _is_report_success is false,
     // no need to send report.
     if (!_is_report_success && done && exec_status.ok()) {
-        return;
+        return Status::NeedSendAgain("");
     }
 
     // If both _is_report_success and _is_report_on_cancel are false,
@@ -887,7 +839,7 @@ void PipelineXFragmentContext::send_report(bool done) {
     // This may happen when the query limit reached and
     // a internal cancellation being processed
     if (!_is_report_success && !_is_report_on_cancel) {
-        return;
+        return Status::NeedSendAgain("");
     }
 
     std::vector<RuntimeState*> runtime_states(_runtime_states.size());
@@ -895,13 +847,13 @@ void PipelineXFragmentContext::send_report(bool done) {
         runtime_states[i] = _runtime_states[i].get();
     }
 
-    _report_status_cb(
+    return _report_status_cb(
             {true, exec_status, runtime_states, nullptr, nullptr, done || 
!exec_status.ok(),
              _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), 
_backend_num,
              _runtime_state.get(),
              std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
              std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
-                       std::placeholders::_2)});
+                       std::placeholders::_2)},
+            shared_from_this());
 }
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 7c54a411c6..c548b8cfa3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -60,7 +60,6 @@ public:
     // Note: this does not take a const RuntimeProfile&, because it might need 
to call
     // functions like PrettyPrint() or to_thrift(), neither of which is const
     // because they take locks.
-    using report_status_callback = std::function<void(const 
ReportStatusRequest)>;
     PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id,
                              std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
                              const std::function<void(RuntimeState*, 
Status*)>& call_back,
@@ -76,6 +75,13 @@ public:
         }
     }
 
+    void instance_ids(std::vector<string>& ins_ids) const override {
+        ins_ids.resize(_runtime_states.size());
+        for (size_t i = 0; i < _runtime_states.size(); i++) {
+            ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id());
+        }
+    }
+
     void add_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) 
override {
         _merge_controller_handlers.emplace_back(handler);
@@ -94,9 +100,7 @@ public:
     void cancel(const PPlanFragmentCancelReason& reason = 
PPlanFragmentCancelReason::INTERNAL_ERROR,
                 const std::string& msg = "") override;
 
-    void send_report(bool) override;
-
-    void report_profile() override;
+    Status send_report(bool) override;
 
     RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
         std::lock_guard<std::mutex> l(_state_map_lock);
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e71d8da0cd..30079126c2 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -231,8 +231,10 @@ void TaskScheduler::_do_work(size_t index) {
         auto check_state = task->get_state();
         if (check_state == PipelineTaskState::PENDING_FINISH) {
             DCHECK(!task->is_pending_finish()) << "must not pending close " << 
task->debug_string();
+            Status exec_status = 
fragment_ctx->get_query_context()->exec_status();
             _try_close_task(task,
-                            canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED);
+                            canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
+                            exec_status);
             continue;
         }
         DCHECK(check_state != PipelineTaskState::FINISHED &&
@@ -243,10 +245,11 @@ void TaskScheduler::_do_work(size_t index) {
             // may change from pending FINISH,should called cancel
             // also may change form BLOCK, other task called cancel
 
-            // If pipeline is canceled caused by memory limit, we should send 
report to FE in order
-            // to cancel all pipeline tasks in this query
-            fragment_ctx->send_report(true);
-            _try_close_task(task, PipelineTaskState::CANCELED);
+            // If pipeline is canceled, it will report after pipeline closed, 
and will propagate
+            // errors to downstream through exchange. So, here we needn't 
send_report.
+            // fragment_ctx->send_report(true);
+            Status cancel_status = 
fragment_ctx->get_query_context()->exec_status();
+            _try_close_task(task, PipelineTaskState::CANCELED, cancel_status);
             continue;
         }
 
@@ -276,10 +279,10 @@ void TaskScheduler::_do_work(size_t index) {
 
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, 
status.to_string());
-            fragment_ctx->send_report(true);
             _try_close_task(task, PipelineTaskState::CANCELED, status);
             continue;
         }
+        fragment_ctx->trigger_report_if_necessary();
 
         if (eos) {
             task->set_eos_time();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 79e67503e1..9a916daf4e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -137,6 +137,13 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
     REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
                          [this]() { return _thread_pool->get_queue_size(); });
     CHECK(s.ok()) << s.to_string();
+
+    s = ThreadPoolBuilder("FragmentInstanceReportThreadPool")
+                .set_min_threads(48)
+                .set_max_threads(512)
+                .set_max_queue_size(102400)
+                .build(&_async_report_thread_pool);
+    CHECK(s.ok()) << s.to_string();
 }
 
 FragmentMgr::~FragmentMgr() = default;
@@ -162,6 +169,7 @@ void FragmentMgr::stop() {
         }
         _pipeline_map.clear();
     }
+    _async_report_thread_pool->shutdown();
 }
 
 std::string FragmentMgr::to_http_path(const std::string& file_name) {
@@ -172,6 +180,16 @@ std::string FragmentMgr::to_http_path(const std::string& 
file_name) {
     return url.str();
 }
 
+Status FragmentMgr::trigger_pipeline_context_report(
+        const ReportStatusRequest req, 
std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
+    return _async_report_thread_pool->submit_func([this, req, ctx]() {
+        coordinator_callback(req);
+        if (!req.done) {
+            ctx->refresh_next_report_time();
+        }
+    });
+}
+
 // There can only be one of these callbacks in-flight at any moment, because
 // it is only invoked from the executor's reporting thread.
 // Also, the reported status will always reflect the most recent execution 
status,
@@ -539,9 +557,11 @@ void FragmentMgr::remove_pipeline_context(
     f_context->instance_ids(ins_ids);
     bool all_done = q_context->countdown(ins_ids.size());
     for (const auto& ins_id : ins_ids) {
+        VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ", 
all_done:" << all_done;
         _pipeline_map.erase(ins_id);
     }
     if (all_done) {
+        LOG(INFO) << "remove query context " << print_id(query_id);
         _query_ctx_map.erase(query_id);
     }
 }
@@ -773,8 +793,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
                 std::make_shared<pipeline::PipelineXFragmentContext>(
                         query_ctx->query_id(), params.fragment_id, query_ctx, 
_exec_env, cb,
-                        
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
-                                        std::placeholders::_1),
+                        std::bind<Status>(
+                                
std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this,
+                                std::placeholders::_1, std::placeholders::_2),
                         params.group_commit);
         {
             SCOPED_RAW_TIMER(&duration_ns);
@@ -851,8 +872,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                     std::make_shared<pipeline::PipelineFragmentContext>(
                             query_ctx->query_id(), fragment_instance_id, 
params.fragment_id,
                             local_params.backend_num, query_ctx, _exec_env, cb,
-                            
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
-                                            std::placeholders::_1));
+                            std::bind<Status>(
+                                    
std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
+                                    this, std::placeholders::_1, 
std::placeholders::_2));
             {
                 SCOPED_RAW_TIMER(&duration_ns);
                 auto prepare_st = context->prepare(params, i);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 14c63c559b..395b9b546c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -99,6 +99,8 @@ public:
                                   const PPlanFragmentCancelReason& reason,
                                   const std::unique_lock<std::mutex>& 
state_lock,
                                   const std::string& msg = "");
+    Status trigger_pipeline_context_report(const ReportStatusRequest,
+                                           
std::shared_ptr<pipeline::PipelineFragmentContext>&&);
 
     // Pipeline version, cancel a fragment instance.
     void cancel_instance(const TUniqueId& instance_id, const 
PPlanFragmentCancelReason& reason,
@@ -203,6 +205,7 @@ private:
     UIntGauge* timeout_canceled_fragment_count = nullptr;
 
     RuntimeFilterMergeController _runtimefilter_controller;
+    std::unique_ptr<ThreadPool> _async_report_thread_pool; // used for 
pipeliine context report
 };
 
 } // namespace doris
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 59a89b44af..4cec558fc5 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -594,7 +594,8 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
             .tag("reason", reason)
             .tag("error message", msg);
     if (_runtime_state->is_cancelled()) {
-        LOG(INFO) << "instance is already cancelled, skip cancel again";
+        LOG(INFO) << "instance << " << 
print_id(_runtime_state->fragment_instance_id())
+                  << "is already cancelled, skip cancel again";
         return;
     }
     DCHECK(_prepared);
@@ -608,7 +609,7 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
     _query_ctx->set_ready_to_execute(true);
 
     // must close stream_mgr to avoid dead lock in Exchange Node
-    _exec_env->vstream_mgr()->cancel(_fragment_instance_id);
+    _exec_env->vstream_mgr()->cancel(_fragment_instance_id, 
Status::Cancelled(msg));
     // Cancel the result queue manager used by spark doris connector
     _exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, 
Status::Aborted(msg));
 #ifndef BE_TEST
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 88a8367ff9..2791291bf4 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -42,7 +42,7 @@
 namespace doris {
 struct ReportStatusRequest {
     bool is_pipeline_x;
-    const Status& status;
+    const Status status;
     std::vector<RuntimeState*> runtime_states;
     RuntimeProfile* profile;
     RuntimeProfile* load_channel_profile;
@@ -97,7 +97,9 @@ public:
 
     // Notice. For load fragments, the fragment_num sent by FE has a small 
probability of 0.
     // this may be a bug, bug <= 1 in theory it shouldn't cause any problems 
at this stage.
-    bool countdown(int instance_num) { return 
fragment_num.fetch_sub(instance_num) <= 1; }
+    bool countdown(int instance_num) {
+        return fragment_num.fetch_sub(instance_num) <= instance_num;
+    }
 
     ExecEnv* exec_env() { return _exec_env; }
 
@@ -137,10 +139,10 @@ public:
         if (_is_cancelled) {
             return false;
         }
+        set_exec_status(new_status);
         _is_cancelled.store(v);
 
         set_ready_to_execute(true);
-        set_exec_status(new_status);
         return true;
     }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 1d5fa990bb..ec18e83eab 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -305,6 +305,13 @@ void 
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
     }
 }
 
+Status RuntimeState::query_status() {
+    auto st = _query_ctx->exec_status();
+    RETURN_IF_ERROR(st);
+    std::lock_guard<std::mutex> l(_process_status_lock);
+    return _process_status;
+}
+
 bool RuntimeState::is_cancelled() const {
     return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
 }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 87f0f45b6a..02c048052a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -151,10 +151,7 @@ public:
                _query_options.enable_common_expr_pushdown;
     }
 
-    Status query_status() {
-        std::lock_guard<std::mutex> l(_process_status_lock);
-        return _process_status;
-    }
+    Status query_status();
 
     // Appends error to the _error_log if there is space
     bool log_error(const std::string& error);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 5367135c13..6ef5066d3b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -556,7 +556,7 @@ void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
         TUniqueId tid;
         tid.__set_hi(request->finst_id().hi());
         tid.__set_lo(request->finst_id().lo());
-
+        signal::set_signal_task_id(tid);
         Status st = Status::OK();
         if (request->has_cancel_reason()) {
             LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
@@ -1184,7 +1184,10 @@ void 
PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont
         if (!st.ok()) {
             LOG(WARNING) << "transmit_block failed, message=" << st
                          << ", fragment_instance_id=" << 
print_id(request->finst_id())
-                         << ", node=" << request->node_id();
+                         << ", node=" << request->node_id()
+                         << ", from sender_id: " << request->sender_id()
+                         << ", be_number: " << request->be_number()
+                         << ", packet_seq: " << request->packet_seq();
         }
     } else {
         st = extract_st;
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index ad161828f9..70bdb1baff 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -126,7 +126,9 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
     }
 
     if (eos) {
-        recvr->remove_sender(request->sender_id(), request->be_number());
+        Status exec_status =
+                request->has_exec_status() ? 
Status::create(request->exec_status()) : Status::OK();
+        recvr->remove_sender(request->sender_id(), request->be_number(), 
exec_status);
     }
     return Status::OK();
 }
@@ -156,7 +158,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
     // Notify concurrent add_data() requests that the stream has been 
terminated.
     // cancel_stream maybe take a long time, so we handle it out of lock.
     if (targert_recvr) {
-        targert_recvr->cancel_stream();
+        targert_recvr->cancel_stream(Status::OK());
         return Status::OK();
     } else {
         std::stringstream err;
@@ -167,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
     }
 }
 
-void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id) {
+void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status 
exec_status) {
     VLOG_QUERY << "cancelling all streams for fragment=" << 
fragment_instance_id;
     std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
     {
@@ -191,7 +193,7 @@ void VDataStreamMgr::cancel(const TUniqueId& 
fragment_instance_id) {
 
     // cancel_stream maybe take a long time, so we handle it out of lock.
     for (auto& it : recvrs) {
-        it->cancel_stream();
+        it->cancel_stream(exec_status);
     }
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index ca0e7ab4b7..d809ff96fb 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -63,7 +63,7 @@ public:
 
     Status transmit_block(const PTransmitDataParams* request, 
::google::protobuf::Closure** done);
 
-    void cancel(const TUniqueId& fragment_instance_id);
+    void cancel(const TUniqueId& fragment_instance_id, Status exec_status);
 
 private:
     std::mutex _lock;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index dd3e1cee29..d5abb50e0d 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -46,7 +46,9 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* 
parent_recvr, int n
         : _recvr(parent_recvr),
           _is_cancelled(false),
           _num_remaining_senders(num_senders),
-          _received_first_batch(false) {}
+          _received_first_batch(false) {
+    _cancel_status = Status::OK();
+}
 
 VDataStreamRecvr::SenderQueue::~SenderQueue() {
     // Check pending closures, if it is not empty, should clear it here. but 
it should not happen.
@@ -81,6 +83,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, 
bool* eos) {
 
 Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* 
block, bool* eos) {
     if (_is_cancelled) {
+        RETURN_IF_ERROR(_cancel_status);
         return Status::Cancelled("Cancelled");
     }
 
@@ -270,17 +273,19 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int 
be_number) {
     }
 }
 
-void VDataStreamRecvr::SenderQueue::cancel() {
+void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_is_cancelled) {
             return;
         }
         _is_cancelled = true;
+        _cancel_status = cancel_status;
         if (_dependency) {
             _dependency->set_always_done();
         }
-        VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << 
_recvr->fragment_instance_id()
+        VLOG_QUERY << "cancelled stream: _fragment_instance_id="
+                   << print_id(_recvr->fragment_instance_id())
                    << " node_id=" << _recvr->dest_node_id();
     }
     // Wake up all threads waiting to produce/consume batches.  They will all
@@ -444,14 +449,21 @@ Status VDataStreamRecvr::get_next(Block* block, bool* 
eos) {
     }
 }
 
-void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status 
exec_status) {
+    if (!exec_status.ok()) {
+        cancel_stream(exec_status);
+        return;
+    }
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->decrement_senders(be_number);
 }
 
-void VDataStreamRecvr::cancel_stream() {
+void VDataStreamRecvr::cancel_stream(Status exec_status) {
+    VLOG_QUERY << "cancel_stream: fragment_instance_id=" << 
print_id(_fragment_instance_id)
+               << exec_status;
+
     for (int i = 0; i < _sender_queues.size(); ++i) {
-        _sender_queues[i]->cancel();
+        _sender_queues[i]->cancel(exec_status);
     }
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index d0b009b22f..d8ab873f87 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -106,9 +106,9 @@ public:
 
     // Indicate that a particular sender is done. Delegated to the appropriate
     // sender queue. Called from DataStreamMgr.
-    void remove_sender(int sender_id, int be_number);
+    void remove_sender(int sender_id, int be_number, Status exec_status);
 
-    void cancel_stream();
+    void cancel_stream(Status exec_status);
 
     void close();
 
@@ -209,7 +209,7 @@ public:
 
     void decrement_senders(int sender_id);
 
-    void cancel();
+    void cancel(Status cancel_status);
 
     void close();
 
@@ -233,6 +233,7 @@ protected:
     VDataStreamRecvr* _recvr;
     std::mutex _lock;
     bool _is_cancelled;
+    Status _cancel_status;
     int _num_remaining_senders;
     std::condition_variable _data_arrival_cv;
     std::condition_variable _data_removal_cv;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 263a063590..61d95538c9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -109,23 +109,23 @@ Status Channel<Parent>::init(RuntimeState* state) {
 }
 
 template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos) {
+Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
     // FIXME: Now, local exchange will cause the performance problem is in a 
multi-threaded scenario
     // so this feature is turned off here by default. We need to re-examine 
this logic
     if (is_local()) {
-        return send_local_block(eos);
+        return send_local_block(exec_status, eos);
     }
     SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
     if (eos) {
         RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
     }
-    RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
+    RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
     ch_roll_pb_block();
     return Status::OK();
 }
 
 template <typename Parent>
-Status Channel<Parent>::send_local_block(bool eos) {
+Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
     if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
         SCOPED_TIMER(_parent->local_send_timer());
     }
@@ -140,7 +140,7 @@ Status Channel<Parent>::send_local_block(bool eos) {
 
         _local_recvr->add_block(&block, _parent->sender_id(), true);
         if (eos) {
-            _local_recvr->remove_sender(_parent->sender_id(), _be_number);
+            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
         }
         return Status::OK();
     } else {
@@ -168,7 +168,7 @@ Status Channel<Parent>::send_local_block(Block* block) {
 }
 
 template <typename Parent>
-Status Channel<Parent>::send_block(PBlock* block, bool eos) {
+Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
     if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
         SCOPED_TIMER(_parent->brpc_send_timer());
         COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
@@ -182,7 +182,7 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) 
{
         SCOPED_TRACK_MEMORY_TO_UNKNOWN();
         _closure->cntl.Reset();
     }
-    VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
_fragment_instance_id
+    VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
print_id(_fragment_instance_id)
              << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
              << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
     if (_is_transfer_chain && (_send_query_statistics_with_every_batch || 
eos)) {
@@ -191,6 +191,9 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) 
{
     }
 
     _brpc_request.set_eos(eos);
+    if (!exec_status.ok()) {
+        exec_status.to_protobuf(_brpc_request.mutable_exec_status());
+    }
     if (block != nullptr) {
         _brpc_request.set_allocated_block(block);
     }
@@ -228,7 +231,7 @@ Status Channel<Parent>::add_rows(Block* block, const 
std::vector<int>& rows, boo
     RETURN_IF_ERROR(
             _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, eos, &rows));
     if (serialized) {
-        RETURN_IF_ERROR(send_current_block(false));
+        RETURN_IF_ERROR(send_current_block(false, Status::OK()));
     }
 
     return Status::OK();
@@ -251,29 +254,29 @@ Status Channel<Parent>::close_wait(RuntimeState* state) {
 }
 
 template <typename Parent>
-Status Channel<Parent>::close_internal() {
+Status Channel<Parent>::close_internal(Status exec_status) {
     if (!_need_close) {
         return Status::OK();
     }
-    VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
+    VLOG_RPC << "Channel::close_internal() instance_id=" << 
print_id(_fragment_instance_id)
              << " dest_node=" << _dest_node_id << " #rows= "
              << ((_serializer.get_block() == nullptr) ? 0 : 
_serializer.get_block()->rows())
-             << " receiver status: " << _receiver_status;
+             << " receiver status: " << _receiver_status << ", exec_status: " 
<< exec_status;
     if (is_receiver_eof()) {
         _serializer.reset_block();
         return Status::OK();
     }
     Status status;
     if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() 
> 0) {
-        status = send_current_block(true);
+        status = send_current_block(true, exec_status);
     } else {
         SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         if (is_local()) {
             if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number);
+                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
             }
         } else {
-            status = send_block((PBlock*)nullptr, true);
+            status = send_remote_block((PBlock*)nullptr, true, exec_status);
         }
     }
     // Don't wait for the last packet to finish, left it to close_wait.
@@ -285,13 +288,13 @@ Status Channel<Parent>::close_internal() {
 }
 
 template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state) {
+Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
 
-    Status st = close_internal();
+    Status st = close_internal(exec_status);
     if (!st.ok()) {
         state->log_error(st.to_string());
     }
@@ -497,7 +500,8 @@ template <typename ChannelPtrType>
 void VDataStreamSender::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel,
                                             Status st) {
     channel->set_receiver_eof(st);
-    channel->close(state);
+    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
+    channel->close(state, Status::OK());
 }
 
 Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
@@ -551,7 +555,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                                 status = channel->send_local_block(&cur_block);
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                                status = channel->send_block(block_holder, 
nullptr, eos);
+                                status = 
channel->send_broadcast_block(block_holder, nullptr, eos);
                             }
                             HANDLE_CHANNEL_STATUS(state, channel, status);
                         }
@@ -578,7 +582,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                             status = channel->send_local_block(&cur_block);
                         } else {
                             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                            status = channel->send_block(_cur_pb_block, false);
+                            status = channel->send_remote_block(_cur_pb_block, 
false);
                         }
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
@@ -600,7 +604,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 RETURN_IF_ERROR(
                         _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block()));
-                auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
+                auto status =
+                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
             }
@@ -682,7 +687,7 @@ Status VDataStreamSender::try_close(RuntimeState* state, 
Status exec_status) {
     _serializer.reset_block();
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
-        Status st = _channels[i]->close(state);
+        Status st = _channels[i]->close(state, exec_status);
         if (!st.ok() && final_st.ok()) {
             final_st = st;
         }
@@ -711,7 +716,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status 
exec_status) {
                             status = channel->send_local_block(&block);
                         } else {
                             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                            status = channel->send_block(_cur_pb_block, false);
+                            status = channel->send_remote_block(_cur_pb_block, 
false);
                         }
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
@@ -719,7 +724,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status 
exec_status) {
             }
         }
         for (int i = 0; i < _channels.size(); ++i) {
-            Status st = _channels[i]->close(state);
+            Status st = _channels[i]->close(state, exec_status);
             if (!st.ok() && final_st.ok()) {
                 final_st = st;
             }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 1698e5e564..dfade831ea 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -273,25 +273,26 @@ public:
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    virtual Status send_block(PBlock* block, bool eos = false);
+    virtual Status send_remote_block(PBlock* block, bool eos = false,
+                                     Status exec_status = Status::OK());
 
-    virtual Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] 
bool* sent,
-                              bool eos = false) {
+    virtual Status send_broadcast_block(BroadcastPBlockHolder* block, 
[[maybe_unused]] bool* sent,
+                                        bool eos = false) {
         return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
     }
 
     virtual Status add_rows(Block* block, const std::vector<int>& row, bool 
eos);
 
-    virtual Status send_current_block(bool eos);
+    virtual Status send_current_block(bool eos, Status exec_status);
 
-    Status send_local_block(bool eos = false);
+    Status send_local_block(Status exec_status, bool eos = false);
 
     Status send_local_block(Block* block);
     // Flush buffered rows and close channel. This function don't wait the 
response
     // of close operation, client should call close_wait() to finish channel's 
close.
     // We split one close operation into two phases in order to make multiple 
channels
     // can run parallel.
-    Status close(RuntimeState* state);
+    Status close(RuntimeState* state, Status exec_status);
 
     // Get close wait's response, to finish channel close operation.
     Status close_wait(RuntimeState* state);
@@ -362,7 +363,7 @@ protected:
     // Serialize _batch into _thrift_batch and send via send_batch().
     // Returns send_batch() status.
     Status send_current_batch(bool eos = false);
-    Status close_internal();
+    Status close_internal(Status exec_status);
 
     Parent* _parent;
 
@@ -476,7 +477,8 @@ public:
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    Status send_block(PBlock* block, bool eos = false) override {
+    Status send_remote_block(PBlock* block, bool eos = false,
+                             Status exec_status = Status::OK()) override {
         COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
         std::unique_ptr<PBlock> pblock_ptr;
         pblock_ptr.reset(block);
@@ -489,13 +491,13 @@ public:
             }
         }
         if (eos || block->column_metas_size()) {
-            RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), 
eos}));
+            RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), 
eos, exec_status}));
         }
         return Status::OK();
     }
 
-    Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* 
sent,
-                      bool eos = false) override {
+    Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] 
bool* sent,
+                                bool eos = false) override {
         COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
         if (eos) {
             if (_eos_send) {
@@ -520,19 +522,20 @@ public:
         RETURN_IF_ERROR(Channel<Parent>::_serializer.next_serialized_block(
                 block, _pblock.get(), 1, &serialized, eos, &rows));
         if (serialized) {
-            RETURN_IF_ERROR(send_current_block(eos));
+            Status exec_status = Status::OK();
+            RETURN_IF_ERROR(send_current_block(eos, exec_status));
         }
 
         return Status::OK();
     }
 
     // send _mutable_block
-    Status send_current_block(bool eos) override {
+    Status send_current_block(bool eos, Status exec_status) override {
         if (Channel<Parent>::is_local()) {
-            return Channel<Parent>::send_local_block(eos);
+            return Channel<Parent>::send_local_block(exec_status, eos);
         }
         SCOPED_CONSUME_MEM_TRACKER(Channel<Parent>::_parent->mem_tracker());
-        RETURN_IF_ERROR(send_block(_pblock.release(), eos));
+        RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, 
exec_status));
         return Status::OK();
     }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 878544e74b..c757315f0d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -49,6 +49,7 @@ message PTransmitDataParams {
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
     optional PUniqueId query_id = 11;
+    optional PStatus exec_status = 12;
 };
 
 message PTransmitDataResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to