This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ff1ca5b40e [fix](pipeline) Fix mem control in local exchanger (#38885)
0ff1ca5b40e is described below

commit 0ff1ca5b40ef5d5dbf83355402b97fba8ed947b4
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Aug 6 09:59:35 2024 +0800

    [fix](pipeline) Fix mem control in local exchanger (#38885)
    
    ## Proposed changes
    
    If a block (>128M) is dequeue by local exchange source operator and it
    is the last block, both of source operators and sink operators will be
    hang. This PR fixed it.
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.h | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 171947ba1c7..ee1afaaf55e 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -859,13 +859,13 @@ public:
     }
 
     void add_total_mem_usage(size_t delta) {
-        if (mem_usage.fetch_add(delta) > 
config::local_exchange_buffer_mem_limit) {
+        if (mem_usage.fetch_add(delta) + delta > 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->block();
         }
     }
 
     void sub_total_mem_usage(size_t delta) {
-        if (mem_usage.fetch_sub(delta) <= 
config::local_exchange_buffer_mem_limit) {
+        if (mem_usage.fetch_sub(delta) - delta <= 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->set_ready();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to