This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6a430910e6e [fix](shuffle) Do not return error if local recvr is null (#35329) 6a430910e6e is described below commit 6a430910e6e5e3a1cb6dbd33133b37edb84dd2f5 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sun May 26 11:44:46 2024 +0800 [fix](shuffle) Do not return error if local recvr is null (#35329) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 17 +++++++++++------ be/src/vec/sink/vdata_stream_sender.cpp | 6 +----- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 69cd139714f..244184bc7a3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -155,12 +155,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { size_t dep_id = 0; for (auto* channel : channels) { if (channel->is_local()) { - _local_channels_dependency.push_back(channel->get_local_channel_dependency()); - DCHECK(_local_channels_dependency[dep_id] != nullptr); - _wait_channel_timer.push_back(_profile->add_nonzero_counter( - fmt::format("WaitForLocalExchangeBuffer{}", dep_id), TUnit ::TIME_NS, - timer_name, 1)); - dep_id++; + if (auto dep = channel->get_local_channel_dependency()) { + _local_channels_dependency.push_back(dep); + DCHECK(_local_channels_dependency[dep_id] != nullptr); + _wait_channel_timer.push_back(_profile->add_nonzero_counter( + fmt::format("WaitForLocalExchangeBuffer{}", dep_id), TUnit ::TIME_NS, + timer_name, 1)); + dep_id++; + } else { + LOG(WARNING) << "local recvr is null: query id = " + << print_id(state->query_id()) << " node id = " << p.node_id(); + } } } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 307e84e9290..63f2aa19515 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -178,11 +178,7 @@ Status Channel<Parent>::open(RuntimeState* state) { std::shared_ptr<pipeline::Dependency> PipChannel::get_local_channel_dependency() { if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) { - throw Exception( - ErrorCode::INTERNAL_ERROR, - "_local_recvr is null: " + - std::to_string(Channel<pipeline::ExchangeSinkLocalState>::_parent->parent() - ->node_id())); + return nullptr; } return Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency( Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org