This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 10483ea12c7 [fix](profile) fix error set with peak_memory_usage in 
pipeline #27749
10483ea12c7 is described below

commit 10483ea12c7099a2dfcda46a428e5c52b33840ac
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Sat Dec 2 14:12:38 2023 +0800

    [fix](profile) fix error set with peak_memory_usage in pipeline #27749
---
 be/src/exec/exec_node.cpp                          |  6 +++---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++++++------
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  5 +----
 be/src/pipeline/exec/exchange_sink_operator.h      |  1 -
 be/src/pipeline/pipeline_x/dependency.h            |  6 ------
 be/src/pipeline/pipeline_x/operator.cpp            |  9 +++++++++
 be/src/pipeline/pipeline_x/operator.h              |  2 ++
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  3 +++
 8 files changed, 24 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 354f80d3c10..4681218dcae 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -200,6 +200,9 @@ void ExecNode::release_resource(doris::RuntimeState* state) 
{
 
         _is_resource_released = true;
     }
+    if (_peak_memory_usage_counter) {
+        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    }
 }
 
 Status ExecNode::close(RuntimeState* state) {
@@ -218,9 +221,6 @@ Status ExecNode::close(RuntimeState* state) {
             result = st;
         }
     }
-    if (_peak_memory_usage_counter) {
-        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
-    }
     release_resource(state);
     LOG(INFO) << "query= " << print_id(state->query_id())
               << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index c2bb041abb2..49cbe30a3b2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -243,10 +243,9 @@ void AggSinkLocalState<DependencyType, 
Derived>::_update_memusage_with_serialize
                         _agg_arena_pool->size() +
                         
Base::_shared_state->aggregate_data_container->memory_usage() -
                         Base::_shared_state->mem_usage_record.used_in_arena;
-                Base::_shared_state->mem_tracker->consume(arena_memory_usage);
-                Base::_shared_state->mem_tracker->consume(
-                        data.get_buffer_size_in_bytes() -
-                        Base::_shared_state->mem_usage_record.used_in_state);
+                Base::_mem_tracker->consume(arena_memory_usage);
+                Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() -
+                                            
Base::_shared_state->mem_usage_record.used_in_state);
                 _serialize_key_arena_memory_usage->add(arena_memory_usage);
                 COUNTER_UPDATE(_hash_table_memory_usage,
                                data.get_buffer_size_in_bytes() -
@@ -438,7 +437,7 @@ template <typename DependencyType, typename Derived>
 void AggSinkLocalState<DependencyType, 
Derived>::_update_memusage_without_key() {
     auto arena_memory_usage =
             _agg_arena_pool->size() - 
Base::_shared_state->mem_usage_record.used_in_arena;
-    Base::_shared_state->mem_tracker->consume(arena_memory_usage);
+    Base::_mem_tracker->consume(arena_memory_usage);
     _serialize_key_arena_memory_usage->add(arena_memory_usage);
     Base::_shared_state->mem_usage_record.used_in_arena = 
_agg_arena_pool->size();
 }
@@ -877,7 +876,8 @@ Status AggSinkLocalState<DependencyType, 
Derived>::close(RuntimeState* state, St
 
     std::vector<char> tmp_deserialize_buffer;
     _deserialize_buffer.swap(tmp_deserialize_buffer);
-
+    
Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state 
+
+                                
Base::_shared_state->mem_usage_record.used_in_arena);
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 1c66ec02207..20d612d5785 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -123,9 +123,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", 
TUnit::BYTES);
     _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
-    _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
-
     static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
     _wait_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForRpcBufferQueue", 
timer_name);
@@ -312,7 +309,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
     COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.exec_time_counter());
-    
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
     bool all_receiver_eof = true;
     for (auto channel : local_state.channels) {
         if (!channel->is_receiver_eof()) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 34502928880..048c8d3910c 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -219,7 +219,6 @@ private:
     RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
     RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
 
     RuntimeProfile::Counter* _wait_queue_timer = nullptr;
     RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 9fbb25aaa2f..49dc327c796 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -353,13 +353,9 @@ public:
         int64_t used_in_state;
     };
     MemoryRecord mem_usage_record;
-    std::unique_ptr<MemTracker> mem_tracker = 
std::make_unique<MemTracker>("AggregateOperator");
     bool agg_data_created_without_key = false;
 
 private:
-    void _release_tracker() {
-        mem_tracker->release(mem_usage_record.used_in_state + 
mem_usage_record.used_in_arena);
-    }
     void _close_with_serialized_key() {
         std::visit(
                 [&](auto&& agg_method) -> void {
@@ -379,7 +375,6 @@ private:
                     }
                 },
                 agg_data->method_variant);
-        _release_tracker();
     }
     void _close_without_key() {
         //because prepare maybe failed, and couldn't create agg data.
@@ -389,7 +384,6 @@ private:
             static_cast<void>(_destroy_agg_status(agg_data->without_key));
             agg_data_created_without_key = false;
         }
-        _release_tracker();
     }
     Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
         for (int i = 0; i < aggregate_evaluators.size(); ++i) {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 375545448fb..04f7cea0314 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -391,6 +391,9 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if (_rows_returned_counter != nullptr) {
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     }
+    if (_peak_memory_usage_counter) {
+        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    }
     _closed = true;
     return Status::OK();
 }
@@ -427,6 +430,9 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
     _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
     info.parent_profile->add_child(_profile, true, nullptr);
     _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+    _memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
+    _peak_memory_usage_counter =
+            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
     return Status::OK();
 }
 
@@ -442,6 +448,9 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
     COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
+    if (_peak_memory_usage_counter) {
+        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    }
     _closed = true;
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 294eb962ee1..58c18db7038 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -396,6 +396,8 @@ protected:
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
     RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _exec_timer = nullptr;
+    RuntimeProfile::Counter* _memory_used_counter = nullptr;
+    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
     std::shared_ptr<Dependency> _finish_dependency;
 };
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index d5d460e80b9..ca0da254e98 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -32,6 +32,7 @@
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
+#include "util/defer_op.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/core/materialize_block.h"
@@ -432,6 +433,7 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, 
int sender_id, int be_n
 }
 
 void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+    _mem_tracker->consume(block->allocated_bytes());
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(block, use_move);
 }
@@ -458,6 +460,7 @@ bool VDataStreamRecvr::ready_to_read() {
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
     _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+    Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes()); 
});
     if (!_is_merging) {
         block->clear();
         return _sender_queues[0]->get_batch(block, eos);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to