This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 746ecd7d18a [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47322) 746ecd7d18a is described below commit 746ecd7d18a43de3d148b00ae22af07031128ff1 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Sun Feb 9 14:13:46 2025 +0800 [fix](exchange)fix exchange sink buffer does not update total_queue_size when EOF. (#47322) ### What problem does this PR solve? pick part from https://github.com/apache/doris/pull/47312 https://github.com/apache/doris/pull/41602 EOF clears _instance_to_package_queue but does not update total_queue_size, causing incorrect judgments that rely on total_queue_size. UT ``` mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :2 mock transmit_blockv2 dest ins id :3 queue size : 6 each queue size : Instance: 2, queue size: 2 Instance: 1, queue size: 2 Instance: 3, queue size: 2 queue size : 6 // error size each queue size : Instance: 2, queue size: 0 Instance: 1, queue size: 2 Instance: 3, queue size: 2 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :1 mock transmit_blockv2 dest ins id :3 mock transmit_blockv2 dest ins id :3 ``` --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1e257e1f86f..9cedfaa6835 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -431,11 +431,19 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; if (q.front().block) { COUNTER_UPDATE(_parent->memory_used_counter(), -q.front().block->ByteSizeLong()); } } + // Try to wake up pipeline after clearing the queue + if (_queue_dependency && _total_queue_size <= _queue_capacity) { + _queue_dependency->set_ready(); + } + { std::queue<TransmitInfo, std::list<TransmitInfo>> empty; swap(empty, q); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org