This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a430910e6e [fix](shuffle) Do not return error if local recvr is null 
(#35329)
6a430910e6e is described below

commit 6a430910e6e5e3a1cb6dbd33133b37edb84dd2f5
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sun May 26 11:44:46 2024 +0800

    [fix](shuffle) Do not return error if local recvr is null (#35329)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 17 +++++++++++------
 be/src/vec/sink/vdata_stream_sender.cpp         |  6 +-----
 2 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 69cd139714f..244184bc7a3 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -155,12 +155,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         size_t dep_id = 0;
         for (auto* channel : channels) {
             if (channel->is_local()) {
-                
_local_channels_dependency.push_back(channel->get_local_channel_dependency());
-                DCHECK(_local_channels_dependency[dep_id] != nullptr);
-                _wait_channel_timer.push_back(_profile->add_nonzero_counter(
-                        fmt::format("WaitForLocalExchangeBuffer{}", dep_id), 
TUnit ::TIME_NS,
-                        timer_name, 1));
-                dep_id++;
+                if (auto dep = channel->get_local_channel_dependency()) {
+                    _local_channels_dependency.push_back(dep);
+                    DCHECK(_local_channels_dependency[dep_id] != nullptr);
+                    
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
+                            fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), TUnit ::TIME_NS,
+                            timer_name, 1));
+                    dep_id++;
+                } else {
+                    LOG(WARNING) << "local recvr is null: query id = "
+                                 << print_id(state->query_id()) << " node id = 
" << p.node_id();
+                }
             }
         }
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 307e84e9290..63f2aa19515 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -178,11 +178,7 @@ Status Channel<Parent>::open(RuntimeState* state) {
 
 std::shared_ptr<pipeline::Dependency> 
PipChannel::get_local_channel_dependency() {
     if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
-        throw Exception(
-                ErrorCode::INTERNAL_ERROR,
-                "_local_recvr is null: " +
-                        
std::to_string(Channel<pipeline::ExchangeSinkLocalState>::_parent->parent()
-                                               ->node_id()));
+        return nullptr;
     }
     return 
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
             Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to