This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0ff1ca5b40e [fix](pipeline) Fix mem control in local exchanger (#38885) 0ff1ca5b40e is described below commit 0ff1ca5b40ef5d5dbf83355402b97fba8ed947b4 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Aug 6 09:59:35 2024 +0800 [fix](pipeline) Fix mem control in local exchanger (#38885) ## Proposed changes If a block (>128M) is dequeue by local exchange source operator and it is the last block, both of source operators and sink operators will be hang. This PR fixed it. <!--Describe your changes.--> --- be/src/pipeline/dependency.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 171947ba1c7..ee1afaaf55e 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -859,13 +859,13 @@ public: } void add_total_mem_usage(size_t delta) { - if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) { + if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { sink_deps.front()->block(); } } void sub_total_mem_usage(size_t delta) { - if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) { + if (mem_usage.fetch_sub(delta) - delta <= config::local_exchange_buffer_mem_limit) { sink_deps.front()->set_ready(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org