This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 76502fdb97e718e7f019f153bb8da4fc23d6370f
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Apr 12 16:22:35 2024 +0800

    [pipelineX](broadcast) Set dependency ready if a limited exchange returns 
EOS (#33525)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp   | 2 ++
 be/src/pipeline/exec/exchange_sink_buffer.h     | 9 +++++++--
 be/src/pipeline/exec/exchange_sink_operator.cpp | 1 +
 3 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 9f692d0beeb..44b655150af 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -464,6 +464,8 @@ void 
ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
         _rpc_channel_is_idle[id] = true;
         _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
     }
+    std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>> empty;
+    swap(empty, _instance_to_broadcast_package_queue[id]);
 }
 
 template <typename Parent>
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 8c0375499c3..43fdc98d24c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -216,6 +216,10 @@ public:
         _finish_dependency = finish_dependency;
     }
 
+    void set_broadcast_dependency(std::shared_ptr<Dependency> 
broadcast_dependency) {
+        _broadcast_dependency = broadcast_dependency;
+    }
+
     void set_should_stop() {
         _should_stop = true;
         _set_ready_to_finish(_busy_channels == 0);
@@ -270,8 +274,9 @@ private:
     int64_t get_sum_rpc_time();
 
     std::atomic<int> _total_queue_size = 0;
-    std::shared_ptr<Dependency> _queue_dependency;
-    std::shared_ptr<Dependency> _finish_dependency;
+    std::shared_ptr<Dependency> _queue_dependency = nullptr;
+    std::shared_ptr<Dependency> _finish_dependency = nullptr;
+    std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
     std::atomic<bool> _should_stop {false};
 };
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index ca50f7bd053..07c7130894a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -181,6 +181,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         _broadcast_dependency =
                 Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                           "BroadcastDependency", true, 
state->get_query_ctx());
+        _sink_buffer->set_broadcast_dependency(_broadcast_dependency);
         _broadcast_pb_blocks =
                 
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
         for (int i = 0; i < config::num_broadcast_buffer; ++i) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to