This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 10483ea12c7 [fix](profile) fix error set with peak_memory_usage in pipeline #27749 10483ea12c7 is described below commit 10483ea12c7099a2dfcda46a428e5c52b33840ac Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Sat Dec 2 14:12:38 2023 +0800 [fix](profile) fix error set with peak_memory_usage in pipeline #27749 --- be/src/exec/exec_node.cpp | 6 +++--- be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++++++------ be/src/pipeline/exec/exchange_sink_operator.cpp | 5 +---- be/src/pipeline/exec/exchange_sink_operator.h | 1 - be/src/pipeline/pipeline_x/dependency.h | 6 ------ be/src/pipeline/pipeline_x/operator.cpp | 9 +++++++++ be/src/pipeline/pipeline_x/operator.h | 2 ++ be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +++ 8 files changed, 24 insertions(+), 20 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 354f80d3c10..4681218dcae 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -200,6 +200,9 @@ void ExecNode::release_resource(doris::RuntimeState* state) { _is_resource_released = true; } + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } } Status ExecNode::close(RuntimeState* state) { @@ -218,9 +221,6 @@ Status ExecNode::close(RuntimeState* state) { result = st; } } - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); - } release_resource(state); LOG(INFO) << "query= " << print_id(state->query_id()) << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index c2bb041abb2..49cbe30a3b2 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -243,10 +243,9 @@ void AggSinkLocalState<DependencyType, Derived>::_update_memusage_with_serialize _agg_arena_pool->size() + Base::_shared_state->aggregate_data_container->memory_usage() - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_shared_state->mem_tracker->consume(arena_memory_usage); - Base::_shared_state->mem_tracker->consume( - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); + Base::_mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() - + Base::_shared_state->mem_usage_record.used_in_state); _serialize_key_arena_memory_usage->add(arena_memory_usage); COUNTER_UPDATE(_hash_table_memory_usage, data.get_buffer_size_in_bytes() - @@ -438,7 +437,7 @@ template <typename DependencyType, typename Derived> void AggSinkLocalState<DependencyType, Derived>::_update_memusage_without_key() { auto arena_memory_usage = _agg_arena_pool->size() - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_shared_state->mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume(arena_memory_usage); _serialize_key_arena_memory_usage->add(arena_memory_usage); Base::_shared_state->mem_usage_record.used_in_arena = _agg_arena_pool->size(); } @@ -877,7 +876,8 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state, St std::vector<char> tmp_deserialize_buffer; _deserialize_buffer.swap(tmp_deserialize_buffer); - + Base::_mem_tracker->release(Base::_shared_state->mem_usage_record.used_in_state + + Base::_shared_state->mem_usage_record.used_in_arena); return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1c66ec02207..20d612d5785 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -123,9 +123,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES); _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); - static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name); _wait_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForRpcBufferQueue", timer_name); @@ -312,7 +309,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); - local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); bool all_receiver_eof = true; for (auto channel : local_state.channels) { if (!channel->is_receiver_eof()) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 34502928880..048c8d3910c 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -219,7 +219,6 @@ private: RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 9fbb25aaa2f..49dc327c796 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -353,13 +353,9 @@ public: int64_t used_in_state; }; MemoryRecord mem_usage_record; - std::unique_ptr<MemTracker> mem_tracker = std::make_unique<MemTracker>("AggregateOperator"); bool agg_data_created_without_key = false; private: - void _release_tracker() { - mem_tracker->release(mem_usage_record.used_in_state + mem_usage_record.used_in_arena); - } void _close_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { @@ -379,7 +375,6 @@ private: } }, agg_data->method_variant); - _release_tracker(); } void _close_without_key() { //because prepare maybe failed, and couldn't create agg data. @@ -389,7 +384,6 @@ private: static_cast<void>(_destroy_agg_status(agg_data->without_key)); agg_data_created_without_key = false; } - _release_tracker(); } Status _destroy_agg_status(vectorized::AggregateDataPtr data) { for (int i = 0; i < aggregate_evaluators.size(); ++i) { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 375545448fb..04f7cea0314 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -391,6 +391,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) { if (_rows_returned_counter != nullptr) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } _closed = true; return Status::OK(); } @@ -427,6 +430,9 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state, _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique<MemTracker>(_parent->get_name()); + _memory_used_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); + _peak_memory_usage_counter = + _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); return Status::OK(); } @@ -442,6 +448,9 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); + if (_peak_memory_usage_counter) { + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + } _closed = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 294eb962ee1..58c18db7038 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -396,6 +396,8 @@ protected: RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; + RuntimeProfile::Counter* _memory_used_counter = nullptr; + RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; std::shared_ptr<Dependency> _finish_dependency; }; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index d5d460e80b9..ca0da254e98 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -32,6 +32,7 @@ #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "util/defer_op.h" #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/core/materialize_block.h" @@ -432,6 +433,7 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_n } void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { + _mem_tracker->consume(block->allocated_bytes()); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(block, use_move); } @@ -458,6 +460,7 @@ bool VDataStreamRecvr::ready_to_read() { Status VDataStreamRecvr::get_next(Block* block, bool* eos) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes()); }); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org