This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit cb1b116c75bb3cd288b0a2c95c21b6a5046456d9
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Tue Sep 3 18:14:41 2024 +0800

    updated
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp         | 14 +++++++++++++-
 .../exec/partitioned_aggregation_source_operator.cpp       |  6 ++++--
 .../exec/partitioned_aggregation_source_operator.h         |  2 ++
 be/src/pipeline/exec/spill_sort_source_operator.cpp        | 10 ++++++++--
 be/src/pipeline/exec/spill_sort_source_operator.h          |  2 ++
 be/src/pipeline/pipeline_fragment_context.cpp              |  2 +-
 6 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 51734bbd5ee..56e2c796667 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -719,7 +719,19 @@ Status AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs&
 }
 
 size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const {
-    return _memory_usage();
+    size_t size_to_reserve = std::visit(
+            [&](auto&& arg) -> size_t {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    return 0;
+                } else {
+                    return 
arg.hash_table->estimate_memory(state->batch_size());
+                }
+            },
+            _agg_data->method_variant);
+
+    size_to_reserve += _memory_usage_last_executing;
+    return size_to_reserve;
 }
 
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 48df5587198..bdbd395ee99 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -48,6 +48,8 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
         return Status::OK();
     }
     _opened = true;
+    _spill_dependency = state->get_spill_dependency();
+    DCHECK(_spill_dependency != nullptr);
     RETURN_IF_ERROR(setup_in_memory_agg_op(state));
     return Status::OK();
 }
@@ -200,7 +202,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                << " merge spilled agg data";
 
     
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
-    _dependency->Dependency::block();
+    _spill_dependency->Dependency::block();
 
     auto query_id = state->query_id();
 
@@ -222,7 +224,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
             }
             
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
             _is_merging = false;
-            _dependency->Dependency::set_ready();
+            _spill_dependency->Dependency::set_ready();
         }};
         bool has_agg_data = false;
         auto& parent = Base::_parent->template cast<Parent>();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index edae99c716a..c09046d840a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -59,6 +59,8 @@ protected:
     bool _current_partition_eos = true;
     bool _is_merging = false;
 
+    Dependency* _spill_dependency {nullptr};
+
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _get_results_timer = nullptr;
     RuntimeProfile::Counter* _serialize_result_timer = nullptr;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index e766cb27168..c3f9f633cd3 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "spill_sort_source_operator.h"
 
+#include <glog/logging.h>
+
 #include "common/status.h"
 #include "pipeline/exec/spill_utils.h"
 #include "runtime/fragment_mgr.h"
@@ -58,6 +60,10 @@ Status SpillSortLocalState::open(RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
+
+    _spill_dependency = state->get_spill_dependency();
+    DCHECK(_spill_dependency != nullptr);
+
     RETURN_IF_ERROR(setup_in_memory_sort_op(state));
     return Base::open(state);
 }
@@ -77,7 +83,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
     auto& parent = Base::_parent->template cast<Parent>();
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << 
_parent->node_id()
                << " merge spill data";
-    _dependency->Dependency::block();
+    _spill_dependency->Dependency::block();
 
     auto query_id = state->query_id();
 
@@ -102,7 +108,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 VLOG_DEBUG << "query " << print_id(query_id) << " sort node " 
<< _parent->node_id()
                            << " merge spill data finish";
             }
-            _dependency->Dependency::set_ready();
+            _spill_dependency->Dependency::set_ready();
         }};
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 66d05e739d8..5674e18ef69 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -60,6 +60,8 @@ protected:
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
 
+    Dependency* _spill_dependency {nullptr};
+
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     // counters for spill merge sort
     RuntimeProfile::Counter* _spill_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index e082bb1980f..71dc601f014 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1834,7 +1834,7 @@ size_t PipelineFragmentContext::get_revocable_size(bool& 
has_running_task) const
 
             size_t revocable_size_ = task->get_revocable_size();
             if (revocable_size_ > _runtime_state->min_revocable_mem()) {
-                revocable_size += task->get_revocable_size();
+                revocable_size += revocable_size_;
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to