This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 924336535f9 branch-4.0: [fix](profile) Fix inaccurate accounting of
memory_used_counter in ExchangeSink #59374 (#59438)
924336535f9 is described below
commit 924336535f95388e51b718a4db8259adfbd7fc23
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 09:19:18 2025 +0800
branch-4.0: [fix](profile) Fix inaccurate accounting of memory_used_counter
in ExchangeSink #59374 (#59438)
Cherry-picked from #59374
Co-authored-by: Mryange <[email protected]>
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 24 ++++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71a0edf31b7..b27bef6a0b3 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -379,6 +379,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
return Status::OK();
}
+ auto get_serializer_mem_bytes = [&local_state]() -> int64_t {
+ int64_t mem_usage = local_state._serializer.mem_usage();
+ for (auto& channel : local_state.channels) {
+ mem_usage += channel->mem_usage();
+ }
+ return mem_usage;
+ };
+
+ int64_t before_serializer_mem_bytes = get_serializer_mem_bytes();
+
auto send_to_current_channel = [&]() -> Status {
// 1. select channel
auto& current_channel =
local_state.channels[local_state.current_channel_idx];
@@ -421,11 +431,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
auto block_holder =
vectorized::BroadcastPBlockHolder::create_shared();
{
bool serialized = false;
- int64_t old_block_mem_bytes =
local_state._serializer.mem_usage();
- Defer update_mem([&]() {
- COUNTER_UPDATE(local_state.memory_used_counter(),
- local_state._serializer.mem_usage() -
old_block_mem_bytes);
- });
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
block, block_holder->get_block(),
local_state._rpc_channels_num,
&serialized, eos));
@@ -499,13 +504,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
// 2. dispatch rows to channel
}
+ int64_t after_serializer_mem_bytes = get_serializer_mem_bytes();
+
+ int64_t delta_mem_bytes = after_serializer_mem_bytes -
before_serializer_mem_bytes;
+ COUNTER_UPDATE(local_state.memory_used_counter(), delta_mem_bytes);
+
Status final_st = Status::OK();
if (eos) {
- int64_t block_mem_bytes = local_state._serializer.mem_usage();
- COUNTER_UPDATE(local_state.memory_used_counter(), -block_mem_bytes);
+ COUNTER_UPDATE(local_state.memory_used_counter(),
-after_serializer_mem_bytes);
local_state._serializer.reset_block();
for (auto& channel : local_state.channels) {
- COUNTER_UPDATE(local_state.memory_used_counter(),
-channel->mem_usage());
Status st = channel->close(state);
/**
* Consider this case below:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]