This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new ade1841a01d [fix](shuffle) Do not return error if local recvr is null 
(#35399)
ade1841a01d is described below

commit ade1841a01d39c5c114d205549d58c9353b09a35
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sun May 26 20:20:50 2024 +0800

    [fix](shuffle) Do not return error if local recvr is null (#35399)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 17 +++++++++++------
 be/src/vec/sink/vdata_stream_sender.cpp         |  4 +---
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 2e0972ca5ee..dfcb3e8e120 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -210,12 +210,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         size_t dep_id = 0;
         for (auto* channel : channels) {
             if (channel->is_local()) {
-                
_local_channels_dependency.push_back(channel->get_local_channel_dependency());
-                DCHECK(_local_channels_dependency[dep_id] != nullptr);
-                _wait_channel_timer.push_back(_profile->add_nonzero_counter(
-                        fmt::format("WaitForLocalExchangeBuffer{}", dep_id), 
TUnit ::TIME_NS,
-                        timer_name, 1));
-                dep_id++;
+                if (auto dep = channel->get_local_channel_dependency()) {
+                    _local_channels_dependency.push_back(dep);
+                    DCHECK(_local_channels_dependency[dep_id] != nullptr);
+                    
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
+                            fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), TUnit ::TIME_NS,
+                            timer_name, 1));
+                    dep_id++;
+                } else {
+                    LOG(WARNING) << "local recvr is null: query id = "
+                                 << print_id(state->query_id()) << " node id = 
" << p.node_id();
+                }
             }
         }
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 24f92bf2aae..ca731b48b37 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -180,9 +180,7 @@ template <typename Parent>
 std::shared_ptr<pipeline::Dependency> 
PipChannel<Parent>::get_local_channel_dependency() {
     if (!Channel<Parent>::_local_recvr) {
         if constexpr (std::is_same_v<pipeline::ExchangeSinkLocalState, 
Parent>) {
-            throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "_local_recvr is null: " +
-                                    
std::to_string(Channel<Parent>::_parent->parent()->node_id()));
+            return nullptr;
         } else {
             throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to