This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 76502fdb97e718e7f019f153bb8da4fc23d6370f Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Apr 12 16:22:35 2024 +0800 [pipelineX](broadcast) Set dependency ready if a limited exchange returns EOS (#33525) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 ++ be/src/pipeline/exec/exchange_sink_buffer.h | 9 +++++++-- be/src/pipeline/exec/exchange_sink_operator.cpp | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 9f692d0beeb..44b655150af 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -464,6 +464,8 @@ void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) { _rpc_channel_is_idle[id] = true; _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } + std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>> empty; + swap(empty, _instance_to_broadcast_package_queue[id]); } template <typename Parent> diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 8c0375499c3..43fdc98d24c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -216,6 +216,10 @@ public: _finish_dependency = finish_dependency; } + void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) { + _broadcast_dependency = broadcast_dependency; + } + void set_should_stop() { _should_stop = true; _set_ready_to_finish(_busy_channels == 0); @@ -270,8 +274,9 @@ private: int64_t get_sum_rpc_time(); std::atomic<int> _total_queue_size = 0; - std::shared_ptr<Dependency> _queue_dependency; - std::shared_ptr<Dependency> _finish_dependency; + std::shared_ptr<Dependency> _queue_dependency = nullptr; + std::shared_ptr<Dependency> _finish_dependency = nullptr; + std::shared_ptr<Dependency> _broadcast_dependency = nullptr; std::atomic<bool> _should_stop {false}; }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index ca50f7bd053..07c7130894a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -181,6 +181,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _broadcast_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true, state->get_query_ctx()); + _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency); for (int i = 0; i < config::num_broadcast_buffer; ++i) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org