This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 18a711d5115b50b57733bc1910cbcf8be4097bd9
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Thu Sep 12 17:11:50 2024 +0800

    Use SCOPED_PEAK_MEM to replace ScopedMemTracker
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  8 ++++----
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |  4 ++--
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |  2 +-
 be/src/pipeline/exec/datagen_operator.cpp          |  2 +-
 .../exec/nested_loop_join_probe_operator.cpp       |  8 ++++----
 be/src/pipeline/exec/operator.h                    | 22 ++--------------------
 be/src/pipeline/exec/repeat_operator.cpp           |  4 ++--
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |  2 +-
 .../exec/streaming_aggregation_operator.cpp        |  4 ++--
 be/src/pipeline/exec/union_source_operator.cpp     |  2 +-
 17 files changed, 27 insertions(+), 45 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 56e2c796667..9597371057e 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -178,7 +178,7 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
     SCOPED_TIMER(_build_timer);
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
                 block,
@@ -191,7 +191,7 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
 
 Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) 
{
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _merge_with_serialized_key_helper<true, false>(block);
     } else {
@@ -401,7 +401,7 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
 
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
             int col_id = AggSharedState::get_slot_column_id(
@@ -440,7 +440,7 @@ void AggSinkLocalState::_update_memusage_without_key() {
 
 Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* 
block) {
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _execute_with_serialized_key_helper<true>(block);
     } else {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 129aea5eb76..39f11f6270f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -126,7 +126,7 @@ protected:
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
 
-    size_t _memory_usage_last_executing = 0;
+    int64_t _memory_usage_last_executing = 0;
 };
 
 class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index c1cb187f3e6..6df089bbb5b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -440,7 +440,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
     local_state.make_nullable_output_key(block);
     // dispose the having clause, should not be execute in prestreaming agg
@@ -482,7 +482,7 @@ void 
AggLocalState::make_nullable_output_key(vectorized::Block* block) {
 template <bool limit>
 Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* 
block) {
     SCOPED_TIMER(_merge_timer);
-    ScopedMemTracker scoped_tracker(_estimate_memory_usage);
+    SCOPED_PEAK_MEM(&_estimate_memory_usage);
 
     size_t key_size = Base::_shared_state->probe_expr_ctxs.size();
     vectorized::ColumnRawPtrs key_columns(key_size);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 6e3aeec283e..903a04a47af 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -266,7 +266,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
 
     local_state._reserve_mem_size = 0;
-    ScopedMemTracker mem_tracker(local_state._reserve_mem_size);
+    SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
 
     local_state._shared_state->input_eos = eos;
     if (local_state._shared_state->input_eos && input_block->rows() == 0) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index a82a1e41044..e5a32cfb3eb 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -62,7 +62,7 @@ private:
 
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
 
-    size_t _reserve_mem_size = 0;
+    int64_t _reserve_mem_size = 0;
 };
 
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 2661314592d..8963ff361d1 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -512,7 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
                                           bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     if (local_state._shared_state->input_eos &&
         (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
          local_state._shared_state->input_total_rows == 0)) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 6c6a28029e2..f83569b1b34 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -42,7 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                     bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     local_state.add_num_rows_returned(block->rows());
     int64_t num_rows_returned = local_state.num_rows_returned();
     bool assert_res = false;
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 03fe9db37bd..bfd1085e535 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -68,7 +68,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block*
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     Status res = local_state._table_func->get_next(state, block, eos);
     RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index bb53755edb2..0a1795544dd 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -466,7 +466,7 @@ Status 
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
                                           bool eos) const {
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     local_state._cur_probe_row_visited_flags.resize(block->rows());
     std::fill(local_state._cur_probe_row_visited_flags.begin(),
               local_state._cur_probe_row_visited_flags.end(), 0);
@@ -493,12 +493,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                           bool* eos) const {
     auto& local_state = get_local_state(state);
     if (_is_output_left_side_only) {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), 
block));
         *eos = local_state._shared_state->left_side_eos;
         local_state._need_more_input_data = 
!local_state._shared_state->left_side_eos;
     } else {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         *eos = ((_match_all_build || _is_right_semi_anti)
                         ? local_state._output_null_idx_build_side ==
                                           
local_state._shared_state->build_blocks.size() &&
@@ -531,7 +531,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                 state, join_op_variants);
             };
             SCOPED_TIMER(local_state._loop_join_timer);
-            ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+            SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
             RETURN_IF_ERROR(std::visit(
                     func, local_state._shared_state->join_op_variants,
                     vectorized::make_bool_variant(_match_all_build || 
_is_right_semi_anti),
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 6e0b332c0a2..6bda3ac0733 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -193,7 +193,7 @@ public:
 
     void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage 
+= usage; }
 
-    size_t& estimate_memory_usage() { return _estimate_memory_usage; }
+    int64_t& estimate_memory_usage() { return _estimate_memory_usage; }
 
     void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
 
@@ -204,7 +204,7 @@ protected:
 
     ObjectPool* _pool = nullptr;
     int64_t _num_rows_returned {0};
-    size_t _estimate_memory_usage {0};
+    int64_t _estimate_memory_usage {0};
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
@@ -237,24 +237,6 @@ protected:
     vectorized::Block _origin_block;
 };
 
-class ScopedMemTracker {
-public:
-    ScopedMemTracker(size_t& counter) : _counter(counter), 
_mem_tracker("ScopedMemTracker") {
-        
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker);
-        _peak_usage = _mem_tracker.peak_consumption();
-    }
-
-    ~ScopedMemTracker() {
-        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
-        _counter += (_mem_tracker.peak_consumption() - _peak_usage);
-    }
-
-private:
-    size_t& _counter;
-    size_t _peak_usage = 0;
-    MemTracker _mem_tracker;
-};
-
 template <typename SharedStateArg = FakeSharedState>
 class PipelineXLocalState : public PipelineXLocalStateBase {
 public:
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 3706a9262fb..13e5ceda857 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -171,7 +171,7 @@ Status RepeatOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block
     auto& _expr_ctxs = local_state._expr_ctxs;
     DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
     if (input_block->rows() > 0) {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         _intermediate_block = vectorized::Block::create_unique();
 
         for (auto& expr : _expr_ctxs) {
@@ -198,7 +198,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
     auto& _intermediate_block = local_state._intermediate_block;
     RETURN_IF_CANCELLED(state);
 
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     DCHECK(_repeat_id_idx >= 0);
     for (const std::vector<int64_t>& v : _grouping_list) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 9d20456e093..7e62892f516 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -68,7 +68,7 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    ScopedMemTracker mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     auto probe_rows = in_block->rows();
     if (probe_rows > 0) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index a19479e281a..542f6652dd4 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -55,7 +55,7 @@ private:
     template <class HashTableContext, bool is_intersected>
     friend struct vectorized::HashTableProbe;
 
-    size_t _estimate_memory_usage = 0;
+    int64_t _estimate_memory_usage = 0;
 
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 6c3260ba850..d1c42ae3c08 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -74,7 +74,7 @@ Status 
SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     _create_mutable_cols(local_state, block);
     auto st = std::visit(
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 7e657859ce9..30335904442 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -53,7 +53,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, 
eos));
     local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 38dfee5c46d..63459a119c4 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1274,7 +1274,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) 
{
 
 Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* 
block, bool* eos) const {
     auto& local_state = get_local_state(state);
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     if (!local_state._pre_aggregated_block->empty()) {
         local_state._pre_aggregated_block->swap(*block);
     } else {
@@ -1291,7 +1291,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, 
vectorized::Block* block
 Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* 
in_block,
                                    bool eos) const {
     auto& local_state = get_local_state(state);
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     local_state._input_num_rows += in_block->rows();
     if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 24d172f6708..18058c95cae 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -141,7 +141,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* 
state, vectorized::Blo
     auto& local_state = 
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
     DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
 
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     auto& _const_expr_list_idx = local_state._const_expr_list_idx;
     vectorized::MutableBlock mblock =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to