This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 35f965532d1 [bugfix](becore) local exchange should check receiver's 
local state to avoid core (#45470)
35f965532d1 is described below

commit 35f965532d1a1a538c1ea92ca5f241e6580d5461
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Tue Dec 17 18:47:29 2024 +0800

    [bugfix](becore) local exchange should check receiver's local state to 
avoid core (#45470)
    
    
    
    _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do some
    memory counter settings
    but it only owns a raw pointer, so that the ExchangeLocalState object
    may be deconstructed.
    So that I lock the local state to avoid it is deconstruted
---
 be/src/vec/sink/vdata_stream_sender.cpp | 24 +++++++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index ceb7be95e40..a4a9e6cccf8 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -97,8 +97,14 @@ Status Channel::open(RuntimeState* state) {
         auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
                 _fragment_instance_id, _dest_node_id, &_local_recvr);
         if (!st.ok()) {
-            // Recvr not found. Maybe downstream task is finished already.
-            LOG(INFO) << "Recvr is not found : " << st.to_string();
+            // If could not find local receiver, then it means the channel is 
EOF.
+            // Maybe downstream task is finished already.
+            //if (_receiver_status.ok()) {
+            //    _receiver_status = Status::EndOfFile("local data stream 
receiver is deconstructed");
+            //}
+            LOG(INFO) << "Query: " << print_id(state->query_id())
+                      << " recvr is not found, maybe downstream task is 
finished. error st is: "
+                      << st.to_string();
         }
     }
     _be_number = state->be_number();
@@ -191,8 +197,20 @@ Status Channel::send_local_block(Block* block, bool eos, 
bool can_be_moved) {
     if (is_receiver_eof()) {
         return _receiver_status;
     }
-
     auto receiver_status = _recvr_status();
+    // _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do 
some memory counter settings
+    // but it only owns a raw pointer, so that the ExchangeLocalState object 
may be deconstructed.
+    // Lock the fragment context to ensure the runtime state and other objects 
are not deconstructed
+    TaskExecutionContextSPtr ctx_lock = nullptr;
+    if (receiver_status.ok() && _local_recvr != nullptr) {
+        ctx_lock = _local_recvr->task_exec_ctx();
+        // Do not return internal error, because when query finished, the 
downstream node
+        // may finish before upstream node. And the object maybe 
deconstructed. If return error
+        // then the upstream node may report error status to FE, the query is 
failed.
+        if (ctx_lock == nullptr) {
+            receiver_status = Status::EndOfFile("local data stream receiver is 
deconstructed");
+        }
+    }
     if (receiver_status.ok()) {
         COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
         COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to