This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 23294426217 fix agg revocable mem size
23294426217 is described below

commit 232944262176ac446f909efd67b35e6c5467ab69
Author: Hu Shenggang <[email protected]>
AuthorDate: Sun Mar 1 19:54:46 2026 +0800

    fix agg revocable mem size
---
 .../partitioned_aggregation_source_operator.cpp    | 30 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 8de7d8a4411..c5863072fa2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -52,6 +52,9 @@ Status PartitionedAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     // Counters for partition spill metrics
     _max_partition_level = ADD_COUNTER(custom_profile(), 
"SpillMaxPartitionLevel", TUnit::UNIT);
     _total_partition_spills = ADD_COUNTER(custom_profile(), 
"SpillTotalPartitions", TUnit::UNIT);
+
+    init_spill_write_counters();
+
     // Nothing else to init for repartitioner here; fanout is configured when
     // repartitioner is initialized with key columns during actual repartition.
     return Status::OK();
@@ -165,12 +168,31 @@ bool 
PartitionedAggSourceOperatorX::is_shuffled_operator() const {
 
 size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    if (!local_state._shared_state->_is_spilled) {
+    if (!local_state._shared_state->_is_spilled || 
!local_state._current_partition.has_data()) {
         return 0;
     }
-    return local_state._estimate_memory_usage < 
state->spill_min_revocable_mem()
-                   ? 0
-                   : local_state._estimate_memory_usage;
+
+    size_t bytes = 0;
+    for (const auto& block : local_state._blocks) {
+        bytes += block.allocated_bytes();
+    }
+    if (local_state._shared_state->_in_mem_shared_state != nullptr &&
+        local_state._shared_state->_in_mem_shared_state->agg_data != nullptr) {
+        auto* agg_data = 
local_state._shared_state->_in_mem_shared_state->agg_data.get();
+        bytes += std::visit(
+                vectorized::Overload {[&](std::monostate& arg) -> size_t { 
return 0; },
+                                      [&](auto& agg_method) -> size_t {
+                                          return 
agg_method.hash_table->get_buffer_size_in_bytes();
+                                      }},
+                agg_data->method_variant);
+
+        if (auto& aggregate_data_container =
+                    
local_state._shared_state->_in_mem_shared_state->aggregate_data_container;
+            aggregate_data_container) {
+            bytes += aggregate_data_container->memory_usage();
+        }
+    }
+    return bytes;
 }
 
 Status PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to