This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 746ecd7d18a [fix](exchange)fix exchange sink buffer does not update 
total_queue_size when EOF. (#47322)
746ecd7d18a is described below

commit 746ecd7d18a43de3d148b00ae22af07031128ff1
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Sun Feb 9 14:13:46 2025 +0800

    [fix](exchange)fix exchange sink buffer does not update total_queue_size 
when EOF. (#47322)
    
    ### What problem does this PR solve?
    pick part from https://github.com/apache/doris/pull/47312
    
    
    https://github.com/apache/doris/pull/41602
    EOF clears _instance_to_package_queue but does not update
    total_queue_size, causing incorrect judgments that rely on
    total_queue_size.
    
    UT
    
    ```
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :2
    mock transmit_blockv2 dest ins id :3
    queue size : 6
    each queue size :
    Instance: 2, queue size: 2
    Instance: 1, queue size: 2
    Instance: 3, queue size: 2
    
    queue size : 6 // error size
    each queue size :
    Instance: 2, queue size: 0
    Instance: 1, queue size: 2
    Instance: 3, queue size: 2
    
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :1
    mock transmit_blockv2 dest ins id :3
    mock transmit_blockv2 dest ins id :3
    ```
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 1e257e1f86f..9cedfaa6835 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -431,11 +431,19 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
 
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
     for (; !q.empty(); q.pop()) {
+        // Must update _total_queue_size here, otherwise if _total_queue_size 
> _queue_capacity at EOF,
+        // ExchangeSinkQueueDependency will be blocked and pipeline will be 
deadlocked
+        _total_queue_size--;
         if (q.front().block) {
             COUNTER_UPDATE(_parent->memory_used_counter(), 
-q.front().block->ByteSizeLong());
         }
     }
 
+    // Try to wake up pipeline after clearing the queue
+    if (_queue_dependency && _total_queue_size <= _queue_capacity) {
+        _queue_dependency->set_ready();
+    }
+
     {
         std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
         swap(empty, q);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to