This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit cb1b116c75bb3cd288b0a2c95c21b6a5046456d9 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Tue Sep 3 18:14:41 2024 +0800 updated --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 14 +++++++++++++- .../exec/partitioned_aggregation_source_operator.cpp | 6 ++++-- .../exec/partitioned_aggregation_source_operator.h | 2 ++ be/src/pipeline/exec/spill_sort_source_operator.cpp | 10 ++++++++-- be/src/pipeline/exec/spill_sort_source_operator.h | 2 ++ be/src/pipeline/pipeline_fragment_context.cpp | 2 +- 6 files changed, 30 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 51734bbd5ee..56e2c796667 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -719,7 +719,19 @@ Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& } size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const { - return _memory_usage(); + size_t size_to_reserve = std::visit( + [&](auto&& arg) -> size_t { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (std::is_same_v<HashTableCtxType, std::monostate>) { + return 0; + } else { + return arg.hash_table->estimate_memory(state->batch_size()); + } + }, + _agg_data->method_variant); + + size_to_reserve += _memory_usage_last_executing; + return size_to_reserve; } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 48df5587198..bdbd395ee99 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -48,6 +48,8 @@ Status PartitionedAggLocalState::open(RuntimeState* state) { return Status::OK(); } _opened = true; + _spill_dependency = state->get_spill_dependency(); + DCHECK(_spill_dependency != nullptr); RETURN_IF_ERROR(setup_in_memory_agg_op(state)); return Status::OK(); } @@ -200,7 +202,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime << " merge spilled agg data"; RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); - _dependency->Dependency::block(); + _spill_dependency->Dependency::block(); auto query_id = state->query_id(); @@ -222,7 +224,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); _is_merging = false; - _dependency->Dependency::set_ready(); + _spill_dependency->Dependency::set_ready(); }}; bool has_agg_data = false; auto& parent = Base::_parent->template cast<Parent>(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index edae99c716a..c09046d840a 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -59,6 +59,8 @@ protected: bool _current_partition_eos = true; bool _is_merging = false; + Dependency* _spill_dependency {nullptr}; + std::unique_ptr<RuntimeProfile> _internal_runtime_profile; RuntimeProfile::Counter* _get_results_timer = nullptr; RuntimeProfile::Counter* _serialize_result_timer = nullptr; diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index e766cb27168..c3f9f633cd3 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -17,6 +17,8 @@ #include "spill_sort_source_operator.h" +#include <glog/logging.h> + #include "common/status.h" #include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" @@ -58,6 +60,10 @@ Status SpillSortLocalState::open(RuntimeState* state) { if (_opened) { return Status::OK(); } + + _spill_dependency = state->get_spill_dependency(); + DCHECK(_spill_dependency != nullptr); + RETURN_IF_ERROR(setup_in_memory_sort_op(state)); return Base::open(state); } @@ -77,7 +83,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto& parent = Base::_parent->template cast<Parent>(); VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->node_id() << " merge spill data"; - _dependency->Dependency::block(); + _spill_dependency->Dependency::block(); auto query_id = state->query_id(); @@ -102,7 +108,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data finish"; } - _dependency->Dependency::set_ready(); + _spill_dependency->Dependency::set_ready(); }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 66d05e739d8..5674e18ef69 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -60,6 +60,8 @@ protected: std::vector<vectorized::SpillStreamSPtr> _current_merging_streams; std::unique_ptr<vectorized::VSortedRunMerger> _merger; + Dependency* _spill_dependency {nullptr}; + std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // counters for spill merge sort RuntimeProfile::Counter* _spill_timer = nullptr; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e082bb1980f..71dc601f014 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1834,7 +1834,7 @@ size_t PipelineFragmentContext::get_revocable_size(bool& has_running_task) const size_t revocable_size_ = task->get_revocable_size(); if (revocable_size_ > _runtime_state->min_revocable_mem()) { - revocable_size += task->get_revocable_size(); + revocable_size += revocable_size_; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org