This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 35f965532d1 [bugfix](becore) local exchange should check receiver's local state to avoid core (#45470) 35f965532d1 is described below commit 35f965532d1a1a538c1ea92ca5f241e6580d5461 Author: yiguolei <guo...@selectdb.com> AuthorDate: Tue Dec 17 18:47:29 2024 +0800 [bugfix](becore) local exchange should check receiver's local state to avoid core (#45470) _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do some memory counter settings but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed. So that I lock the local state to avoid it is deconstruted --- be/src/vec/sink/vdata_stream_sender.cpp | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ceb7be95e40..a4a9e6cccf8 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -97,8 +97,14 @@ Status Channel::open(RuntimeState* state) { auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr( _fragment_instance_id, _dest_node_id, &_local_recvr); if (!st.ok()) { - // Recvr not found. Maybe downstream task is finished already. - LOG(INFO) << "Recvr is not found : " << st.to_string(); + // If could not find local receiver, then it means the channel is EOF. + // Maybe downstream task is finished already. + //if (_receiver_status.ok()) { + // _receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); + //} + LOG(INFO) << "Query: " << print_id(state->query_id()) + << " recvr is not found, maybe downstream task is finished. error st is: " + << st.to_string(); } } _be_number = state->be_number(); @@ -191,8 +197,20 @@ Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { if (is_receiver_eof()) { return _receiver_status; } - auto receiver_status = _recvr_status(); + // _local_recvr depdend on pipeline::ExchangeLocalState* _parent to do some memory counter settings + // but it only owns a raw pointer, so that the ExchangeLocalState object may be deconstructed. + // Lock the fragment context to ensure the runtime state and other objects are not deconstructed + TaskExecutionContextSPtr ctx_lock = nullptr; + if (receiver_status.ok() && _local_recvr != nullptr) { + ctx_lock = _local_recvr->task_exec_ctx(); + // Do not return internal error, because when query finished, the downstream node + // may finish before upstream node. And the object maybe deconstructed. If return error + // then the upstream node may report error status to FE, the query is failed. + if (ctx_lock == nullptr) { + receiver_status = Status::EndOfFile("local data stream receiver is deconstructed"); + } + } if (receiver_status.ok()) { COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org