This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 18a711d5115b50b57733bc1910cbcf8be4097bd9 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Thu Sep 12 17:11:50 2024 +0800 Use SCOPED_PEAK_MEM to replace ScopedMemTracker --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 ++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../pipeline/exec/aggregation_source_operator.cpp | 4 ++-- be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_source_operator.cpp | 2 +- be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 +- be/src/pipeline/exec/datagen_operator.cpp | 2 +- .../exec/nested_loop_join_probe_operator.cpp | 8 ++++---- be/src/pipeline/exec/operator.h | 22 ++-------------------- be/src/pipeline/exec/repeat_operator.cpp | 4 ++-- be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.cpp | 2 +- be/src/pipeline/exec/sort_source_operator.cpp | 2 +- .../exec/streaming_aggregation_operator.cpp | 4 ++-- be/src/pipeline/exec/union_source_operator.cpp | 2 +- 17 files changed, 27 insertions(+), 45 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 56e2c796667..9597371057e 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -178,7 +178,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { DCHECK(_agg_data->without_key != nullptr); SCOPED_TIMER(_build_timer); _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( block, @@ -191,7 +191,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) { _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); if (_shared_state->reach_limit) { return _merge_with_serialized_key_helper<true, false>(block); } else { @@ -401,7 +401,7 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { DCHECK(_agg_data->without_key != nullptr); _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) { int col_id = AggSharedState::get_slot_column_id( @@ -440,7 +440,7 @@ void AggSinkLocalState::_update_memusage_without_key() { Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); if (_shared_state->reach_limit) { return _execute_with_serialized_key_helper<true>(block); } else { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 129aea5eb76..39f11f6270f 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -126,7 +126,7 @@ protected: std::unique_ptr<ExecutorBase> _executor = nullptr; - size_t _memory_usage_last_executing = 0; + int64_t _memory_usage_last_executing = 0; }; class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> { diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index c1cb187f3e6..6df089bbb5b 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -440,7 +440,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg @@ -482,7 +482,7 @@ void AggLocalState::make_nullable_output_key(vectorized::Block* block) { template <bool limit> Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) { SCOPED_TIMER(_merge_timer); - ScopedMemTracker scoped_tracker(_estimate_memory_usage); + SCOPED_PEAK_MEM(&_estimate_memory_usage); size_t key_size = Base::_shared_state->probe_expr_ctxs.size(); vectorized::ColumnRawPtrs key_columns(key_size); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 6e3aeec283e..903a04a47af 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -266,7 +266,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); local_state._reserve_mem_size = 0; - ScopedMemTracker mem_tracker(local_state._reserve_mem_size); + SCOPED_PEAK_MEM(&local_state._reserve_mem_size); local_state._shared_state->input_eos = eos; if (local_state._shared_state->input_eos && input_block->rows() == 0) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index a82a1e41044..e5a32cfb3eb 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -62,7 +62,7 @@ private: std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs; - size_t _reserve_mem_size = 0; + int64_t _reserve_mem_size = 0; }; class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 2661314592d..8963ff361d1 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -512,7 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); if (local_state._shared_state->input_eos && (local_state._output_block_index == local_state._shared_state->input_blocks.size() || local_state._shared_state->input_total_rows == 0)) { diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 6c6a28029e2..f83569b1b34 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -42,7 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); local_state.add_num_rows_returned(block->rows()); int64_t num_rows_returned = local_state.num_rows_returned(); bool assert_res = false; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 03fe9db37bd..bfd1085e535 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -68,7 +68,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); Status res = local_state._table_func->get_next(state, block, eos); RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index bb53755edb2..0a1795544dd 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -466,7 +466,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized bool eos) const { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); local_state._cur_probe_row_visited_flags.resize(block->rows()); std::fill(local_state._cur_probe_row_visited_flags.begin(), local_state._cur_probe_row_visited_flags.end(), 0); @@ -493,12 +493,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_left_side_only) { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); *eos = local_state._shared_state->left_side_eos; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; } else { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); *eos = ((_match_all_build || _is_right_semi_anti) ? local_state._output_null_idx_build_side == local_state._shared_state->build_blocks.size() && @@ -531,7 +531,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block state, join_op_variants); }; SCOPED_TIMER(local_state._loop_join_timer); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(std::visit( func, local_state._shared_state->join_op_variants, vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 6e0b332c0a2..6bda3ac0733 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -193,7 +193,7 @@ public: void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; } - size_t& estimate_memory_usage() { return _estimate_memory_usage; } + int64_t& estimate_memory_usage() { return _estimate_memory_usage; } void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } @@ -204,7 +204,7 @@ protected: ObjectPool* _pool = nullptr; int64_t _num_rows_returned {0}; - size_t _estimate_memory_usage {0}; + int64_t _estimate_memory_usage {0}; std::unique_ptr<RuntimeProfile> _runtime_profile; @@ -237,24 +237,6 @@ protected: vectorized::Block _origin_block; }; -class ScopedMemTracker { -public: - ScopedMemTracker(size_t& counter) : _counter(counter), _mem_tracker("ScopedMemTracker") { - thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker); - _peak_usage = _mem_tracker.peak_consumption(); - } - - ~ScopedMemTracker() { - thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); - _counter += (_mem_tracker.peak_consumption() - _peak_usage); - } - -private: - size_t& _counter; - size_t _peak_usage = 0; - MemTracker _mem_tracker; -}; - template <typename SharedStateArg = FakeSharedState> class PipelineXLocalState : public PipelineXLocalStateBase { public: diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 3706a9262fb..13e5ceda857 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -171,7 +171,7 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block auto& _expr_ctxs = local_state._expr_ctxs; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); if (input_block->rows() > 0) { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); _intermediate_block = vectorized::Block::create_unique(); for (auto& expr : _expr_ctxs) { @@ -198,7 +198,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp auto& _intermediate_block = local_state._intermediate_block; RETURN_IF_CANCELLED(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); DCHECK(_repeat_id_idx >= 0); for (const std::vector<int64_t>& v : _grouping_list) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 9d20456e093..7e62892f516 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -68,7 +68,7 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - ScopedMemTracker mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); auto probe_rows = in_block->rows(); if (probe_rows > 0) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index a19479e281a..542f6652dd4 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -55,7 +55,7 @@ private: template <class HashTableContext, bool is_intersected> friend struct vectorized::HashTableProbe; - size_t _estimate_memory_usage = 0; + int64_t _estimate_memory_usage = 0; //record insert column id during probe std::vector<uint16_t> _probe_column_inserted_id; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 6c3260ba850..d1c42ae3c08 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -74,7 +74,7 @@ Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); _create_mutable_cols(local_state, block); auto st = std::visit( diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 7e657859ce9..30335904442 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -53,7 +53,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) { Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos)); local_state.reached_limit(block, eos); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 38dfee5c46d..63459a119c4 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1274,7 +1274,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) { Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { auto& local_state = get_local_state(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); if (!local_state._pre_aggregated_block->empty()) { local_state._pre_aggregated_block->swap(*block); } else { @@ -1291,7 +1291,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, bool eos) const { auto& local_state = get_local_state(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); local_state._input_num_rows += in_block->rows(); if (in_block->rows() > 0) { diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 24d172f6708..18058c95cae 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -141,7 +141,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>(); DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); auto& _const_expr_list_idx = local_state._const_expr_list_idx; vectorized::MutableBlock mblock = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org