This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 924336535f9 branch-4.0: [fix](profile) Fix inaccurate accounting of 
memory_used_counter in ExchangeSink #59374 (#59438)
924336535f9 is described below

commit 924336535f95388e51b718a4db8259adfbd7fc23
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 09:19:18 2025 +0800

    branch-4.0: [fix](profile) Fix inaccurate accounting of memory_used_counter 
in ExchangeSink #59374 (#59438)
    
    Cherry-picked from #59374
    
    Co-authored-by: Mryange <[email protected]>
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 24 ++++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71a0edf31b7..b27bef6a0b3 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -379,6 +379,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         return Status::OK();
     }
 
+    auto get_serializer_mem_bytes = [&local_state]() -> int64_t {
+        int64_t mem_usage = local_state._serializer.mem_usage();
+        for (auto& channel : local_state.channels) {
+            mem_usage += channel->mem_usage();
+        }
+        return mem_usage;
+    };
+
+    int64_t before_serializer_mem_bytes = get_serializer_mem_bytes();
+
     auto send_to_current_channel = [&]() -> Status {
         // 1. select channel
         auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
@@ -421,11 +431,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             auto block_holder = 
vectorized::BroadcastPBlockHolder::create_shared();
             {
                 bool serialized = false;
-                int64_t old_block_mem_bytes = 
local_state._serializer.mem_usage();
-                Defer update_mem([&]() {
-                    COUNTER_UPDATE(local_state.memory_used_counter(),
-                                   local_state._serializer.mem_usage() - 
old_block_mem_bytes);
-                });
                 RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
                         block, block_holder->get_block(), 
local_state._rpc_channels_num,
                         &serialized, eos));
@@ -499,13 +504,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         // 2. dispatch rows to channel
     }
 
+    int64_t after_serializer_mem_bytes = get_serializer_mem_bytes();
+
+    int64_t delta_mem_bytes = after_serializer_mem_bytes - 
before_serializer_mem_bytes;
+    COUNTER_UPDATE(local_state.memory_used_counter(), delta_mem_bytes);
+
     Status final_st = Status::OK();
     if (eos) {
-        int64_t block_mem_bytes = local_state._serializer.mem_usage();
-        COUNTER_UPDATE(local_state.memory_used_counter(), -block_mem_bytes);
+        COUNTER_UPDATE(local_state.memory_used_counter(), 
-after_serializer_mem_bytes);
         local_state._serializer.reset_block();
         for (auto& channel : local_state.channels) {
-            COUNTER_UPDATE(local_state.memory_used_counter(), 
-channel->mem_usage());
             Status st = channel->close(state);
             /**
              * Consider this case below:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to