This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 616c205bfc279e4d3c9b861c2698ee0e318be784 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Tue Sep 10 11:06:04 2024 +0800 [improvement](spill) add counter for memory usage (#40572) Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 27 ++++++++++++++++++++-- be/src/pipeline/exec/exchange_sink_operator.cpp | 27 ++++++++++++++++++++++ be/src/pipeline/exec/exchange_source_operator.cpp | 4 ++-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 ++- be/src/pipeline/exec/operator.cpp | 10 ++++---- be/src/pipeline/exec/operator.h | 5 ++++ .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 2 +- .../exec/partitioned_hash_join_sink_operator.cpp | 24 ++++++++++++++++++- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 2 +- .../pipeline/exec/spill_sort_source_operator.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 18 +++++++++++++++ be/src/pipeline/pipeline_fragment_context.h | 6 +++-- be/src/runtime/workload_group/workload_group.cpp | 15 +++++++----- be/src/vec/runtime/vdata_stream_mgr.cpp | 7 +++--- be/src/vec/runtime/vdata_stream_mgr.h | 4 ++++ be/src/vec/runtime/vdata_stream_recvr.cpp | 17 +++++++------- be/src/vec/runtime/vdata_stream_recvr.h | 7 +++--- be/src/vec/sink/vdata_stream_sender.cpp | 6 +++++ be/src/vec/sink/vdata_stream_sender.h | 2 ++ be/src/vec/spill/spill_stream.h | 5 ++-- be/src/vec/spill/spill_writer.cpp | 8 +++++++ be/src/vec/spill/spill_writer.h | 13 +++++++---- 23 files changed, 172 insertions(+), 44 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 5027d7c10de..58ca10af644 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -162,6 +162,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); } + _parent->memory_used_counter()->update(request.block->ByteSizeLong()); _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; if (_queue_dependency && _total_queue_size > _queue_capacity) { @@ -196,6 +197,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } + _parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong()); _instance_to_broadcast_package_queue[ins_id].emplace(request); } if (send_now) { @@ -291,6 +293,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block) { + _parent->memory_used_counter()->update(-request.block->ByteSizeLong()); static_cast<void>(brpc_request->release_block()); } q.pop(); @@ -370,6 +373,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } } if (request.block_holder->get_block()) { + _parent->memory_used_counter()->update( + -request.block_holder->get_block()->ByteSizeLong()); static_cast<void>(brpc_request->release_block()); } broadcast_q.pop(); @@ -421,8 +426,26 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; _turn_off_channel(id, true); - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty; - swap(empty, _instance_to_broadcast_package_queue[id]); + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + for (; !broadcast_q.empty(); broadcast_q.pop()) { + _parent->memory_used_counter()->update( + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } + { + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty; + swap(empty, broadcast_q); + } + + std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; + for (; !q.empty(); q.pop()) { + _parent->memory_used_counter()->update(-q.front().block->ByteSizeLong()); + } + + { + std::queue<TransmitInfo, std::list<TransmitInfo>> empty; + swap(empty, q); + } } bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9e17a76d272..ea16ab2d4fb 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -458,6 +458,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR( local_state._partitioner->do_partitioning(state, block, _mem_tracker.get())); } + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows( state, local_state.channels, local_state._partition_count, @@ -467,7 +471,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block state, local_state.channel_shared_ptrs, local_state._partition_count, local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos)); } + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } // check out of limit RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); @@ -501,7 +514,16 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, channel2rows, convert_block.get(), eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + int64_t old_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + old_channel_mem_usage += channel->mem_usage(); + } { SCOPED_TIMER(local_state._split_block_hash_compute_timer); RETURN_IF_ERROR( @@ -512,6 +534,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(channel_add_rows_with_idx( state, local_state.channels, local_state.channels.size(), assignments, block, eos)); + int64_t new_channel_mem_usage = 0; + for (const auto& channel : local_state.channels) { + new_channel_mem_usage += channel->mem_usage(); + } + local_state.memory_used_counter()->update(new_channel_mem_usage - old_channel_mem_usage); } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 9db0bca0c43..88915ca2c8a 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -62,8 +62,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); auto& p = _parent->cast<ExchangeSourceOperatorX>(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), - profile(), p.is_merging()); + state, this, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), + p.num_senders(), profile(), p.is_merging()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index ae724549f71..0ddc533bcea 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -549,7 +549,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. - local_state._build_side_mem_used += in_block->allocated_bytes(); if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( @@ -582,6 +581,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state._build_side_merge_block_timer); RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow( std::move(*in_block))); + local_state._build_side_mem_used = + local_state._build_side_mutable_block.allocated_bytes(); } } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index cf1e82f57dd..6d25f75da5d 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -489,9 +489,9 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", TUnit::BYTES, 1); + _peak_memory_usage_counter = + _runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "", 1); return Status::OK(); } @@ -569,9 +569,9 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); info.parent_profile->add_child(_profile, true, nullptr); _mem_tracker = std::make_unique<MemTracker>(_parent->get_name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); + _memory_used_counter = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", TUnit::BYTES, 1); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "", 1); return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d111a2a7e24..06715343a0e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -162,6 +162,8 @@ public: RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; } RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } OperatorXBase* parent() { return _parent; } RuntimeState* state() { return _state; } vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } @@ -372,6 +374,9 @@ public: RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } + RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } + RuntimeProfile::Counter* peak_memory_usage_counter() { return _peak_memory_usage_counter; } + virtual std::vector<Dependency*> dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 65c1cefa63b..a7929337b63 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -150,7 +150,7 @@ public: spill_stream->set_write_counters(Base::_spill_serialize_block_timer, Base::_spill_block_count, Base::_spill_data_size, Base::_spill_write_disk_timer, - Base::_spill_write_wait_io_timer); + Base::_spill_write_wait_io_timer, memory_used_counter()); status = to_block(context, keys, values, null_key_data); RETURN_IF_ERROR(status); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 51b6e143b3c..f1ca4b8f7b5 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -194,7 +194,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat RETURN_IF_ERROR(spilling_stream->prepare_spill()); spilling_stream->set_write_counters( _spill_serialize_block_timer, _spill_block_count, _spill_data_size, - _spill_write_disk_timer, _spill_write_wait_io_timer); + _spill_write_disk_timer, _spill_write_wait_io_timer, memory_used_counter()); } COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index a073d922769..7e106db5358 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -70,7 +70,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(spilling_stream->prepare_spill()); spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer); + _spill_write_wait_io_timer, memory_used_counter()); } return p._partitioner->clone(state, _partitioner); } @@ -132,10 +132,12 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; + size_t block_old_mem = 0; auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); if (inner_sink_state_) { auto inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); build_block = inner_sink_state->_build_side_mutable_block.to_block(); + block_old_mem = build_block.allocated_bytes(); } if (build_block.rows() <= 1) { @@ -146,9 +148,11 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta if (build_block.columns() > num_slots) { build_block.erase(num_slots); + memory_used_counter()->update(build_block.allocated_bytes() - block_old_mem); } auto spill_func = [build_block = std::move(build_block), state, this]() mutable { + Defer defer {[&]() { memory_used_counter()->set((int64_t)revocable_mem_size(state)); }}; auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count); @@ -160,6 +164,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta auto flush_rows = [&state, this](std::unique_ptr<vectorized::MutableBlock>& partition_block, vectorized::SpillStreamSPtr& spilling_stream) { auto block = partition_block->to_block(); + Defer defer {[&]() { memory_used_counter()->update(-block.allocated_bytes()); }}; auto status = spilling_stream->spill_block(state, block, false); if (!status.ok()) { @@ -182,6 +187,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta sub_block.get_by_position(i).column = build_block.get_by_position(i).column->cut(offset, this_run); } + auto sub_blocks_memory_usage = sub_block.allocated_bytes(); + memory_used_counter()->update(sub_blocks_memory_usage); + Defer defer {[&]() { memory_used_counter()->update(-sub_blocks_memory_usage); }}; offset += this_run; const auto is_last_block = offset == total_rows; @@ -206,10 +214,12 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta partition_block = vectorized::MutableBlock::create_unique(build_block.clone_empty()); } + auto old_mem = partition_block->allocated_bytes(); { SCOPED_TIMER(_partition_shuffle_timer); Status st = partition_block->add_rows(&sub_block, begin, end); + memory_used_counter()->update(partition_block->allocated_bytes() - old_mem); if (!st.ok()) { std::unique_lock<std::mutex> lock(_spill_lock); _spill_status = st; @@ -226,6 +236,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } partition_block = vectorized::MutableBlock::create_unique(build_block.clone_empty()); + memory_used_counter()->update(partition_block->allocated_bytes()); } } } @@ -368,6 +379,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, if (!rows) { return Status::OK(); } + Defer defer {[&]() { memory_used_counter()->set((int64_t)revocable_mem_size(state)); }}; { /// TODO: DO NOT execute build exprs twice(when partition and building hash table) SCOPED_TIMER(_partition_timer); @@ -407,7 +419,10 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (_spill_status_ok) { auto block = partitioned_block->to_block(); + auto block_mem_usage = block.allocated_bytes(); + Defer defer {[&]() { memory_used_counter()->update(-block_mem_usage); }}; partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty()); + memory_used_counter()->update(partitioned_block->allocated_bytes()); auto st = spilling_stream->spill_block(state(), block, false); if (!st.ok()) { _spill_status_ok = false; @@ -547,6 +562,10 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink_eos failed"); }); + Defer defer {[&]() { + local_state.memory_used_counter()->set( + (int64_t)local_state.revocable_mem_size(state)); + }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, set_ready_to_read" @@ -584,6 +603,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink failed"); }); + Defer defer {[&]() { + local_state.memory_used_counter()->set((int64_t)local_state.revocable_mem_size(state)); + }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 9dcb66240df..5a37ac2f5ab 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -200,7 +200,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { _spilling_stream->set_write_counters( Base::_spill_serialize_block_timer, Base::_spill_block_count, Base::_spill_data_size, - Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer); + Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer, memory_used_counter()); status = _spilling_stream->prepare_spill(); RETURN_IF_ERROR(status); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 601188ae02e..8e601a5c7a5 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -148,7 +148,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat bool eos = false; tmp_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer); + _spill_write_wait_io_timer, memory_used_counter()); while (!eos && !state->is_cancelled()) { merge_sorted_block.clear_column_data(); { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5d1b6aaaa1b..db7310aa80d 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -187,6 +187,9 @@ void PipelineFragmentContext::cancel(const Status reason) { this->debug_string()); } + if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) { + print_profile("cancel pipeline, reason: " + reason.to_string()); + } _query_ctx->cancel(reason, _fragment_id); if (reason.is<ErrorCode::LIMIT_REACH>()) { _is_report_on_cancel = false; @@ -1721,6 +1724,21 @@ Status PipelineFragmentContext::submit() { } } +void PipelineFragmentContext::print_profile(const std::string& extra_info) { + if (_runtime_state->enable_profile()) { + std::stringstream ss; + for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) { + runtime_profile_ptr->pretty_print(&ss); + } + + if (_runtime_state->load_channel_profile()) { + _runtime_state->load_channel_profile()->pretty_print(&ss); + } + + LOG_INFO("Query {} fragment {} {}, profile, {}", print_id(this->_query_id), + this->_fragment_id, extra_info, ss.str()); + } +} // If all pipeline tasks binded to the fragment instance are finished, then we could // close the fragment instance. void PipelineFragmentContext::_close_fragment_instance() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index a2c55214ba3..c0924be38b6 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -69,8 +69,10 @@ public: ~PipelineFragmentContext(); - std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile() const; - std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const; + void print_profile(const std::string& extra_info); + + std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile_x() const; + std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile_x() const; bool is_timeout(timespec now) const; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6f3b51f09fd..4d53ffc9686 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -98,17 +98,20 @@ std::string WorkloadGroup::debug_string() const { } std::string WorkloadGroup::memory_debug_string() const { + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + auto mem_used_ratio = realtime_total_mem_used / (double)_weighted_memory_limit; return fmt::format( "TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = " - "{}, weighted_memory_limit = {}, total_mem_used = {}, " - "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, " + "{}, weighted_memory_limit = {}, total_mem_used = {}," + "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " + "{}, " "spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]", _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", - PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES), - PrettyPrinter::print(_total_mem_used, TUnit::BYTES), - PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES), - _spill_low_watermark, _spill_high_watermark, _version, _is_shutdown, + PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES), + PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), + PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), + mem_used_ratio, _spill_low_watermark, _spill_high_watermark, _version, _is_shutdown, _query_ctxs.size()); } diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index a5db9a6150d..e248b074f67 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -61,12 +61,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc } std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( - RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging) { + RuntimeState* state, pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; - std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, state, row_desc, + std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(this, parent, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 09e347fcfb2..bd5e6f9b91e 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -40,6 +40,9 @@ class RuntimeState; class RowDescriptor; class RuntimeProfile; class PTransmitDataParams; +namespace pipeline { +class ExchangeLocalState; +} namespace vectorized { class VDataStreamRecvr; @@ -50,6 +53,7 @@ public: ~VDataStreamMgr(); std::shared_ptr<VDataStreamRecvr> create_recvr(RuntimeState* state, + pipeline::ExchangeLocalState* parent, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c5..d2f79b8529e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -96,6 +96,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); _block_queue.pop_front(); + _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -207,6 +208,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } + _recvr->_parent->memory_used_counter()->update(block_byte_size); add_blocks_memory_usage(block_byte_size); return Status::OK(); } @@ -245,6 +247,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); + _recvr->_parent->memory_used_counter()->update(block_mem_size); add_blocks_memory_usage(block_mem_size); } } @@ -315,12 +318,13 @@ void VDataStreamRecvr::SenderQueue::close() { _block_queue.clear(); } -VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, - const RowDescriptor& row_desc, +VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile) : HasTaskExecutionCtx(state), _mgr(stream_mgr), + _parent(parent), _query_thread_context(state->query_id(), state->query_mem_tracker(), state->get_query_ctx()->workload_group()), _fragment_instance_id(fragment_instance_id), @@ -352,9 +356,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta } // Initialize the counters - _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _peak_memory_usage_counter = - _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); @@ -417,7 +418,7 @@ std::shared_ptr<pipeline::Dependency> VDataStreamRecvr::get_local_channel_depend } Status VDataStreamRecvr::get_next(Block* block, bool* eos) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + _parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption()); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -492,8 +493,8 @@ void VDataStreamRecvr::close() { _mgr = nullptr; _merger.reset(); - if (_peak_memory_usage_counter) { - _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + if (_parent->peak_memory_usage_counter()) { + _parent->peak_memory_usage_counter()->set(_mem_tracker->peak_consumption()); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e8dcfdedba5..b2d76590ba2 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -69,7 +69,8 @@ class VDataStreamRecvr; class VDataStreamRecvr : public HasTaskExecutionCtx { public: class SenderQueue; - VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, + VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::ExchangeLocalState* parent, + RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, RuntimeProfile* profile); @@ -120,6 +121,8 @@ private: // DataStreamMgr instance used to create this recvr. (Not owned) VDataStreamMgr* _mgr = nullptr; + pipeline::ExchangeLocalState* _parent = nullptr; + QueryThreadContext _query_thread_context; // Fragment and node id of the destination exchange node this receiver is used by. @@ -152,8 +155,6 @@ private: RuntimeProfile::Counter* _data_arrival_timer = nullptr; RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; // Number of rows received RuntimeProfile::Counter* _rows_produced_counter = nullptr; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b6bfc7862dc..f894df25153 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -140,6 +140,12 @@ std::shared_ptr<pipeline::Dependency> PipChannel::get_local_channel_dependency() Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id()); } +int64_t PipChannel::mem_usage() const { + auto* mutable_block = Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block(); + int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0; + return mem_usage; +} + Status PipChannel::send_remote_block(PBlock* block, bool eos, Status exec_status) { COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(), 1); std::unique_ptr<PBlock> pblock_ptr; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 0ceec97f1fc..6cd1ad9ac49 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -275,6 +275,8 @@ public: ~PipChannel() override { delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; } + int64_t mem_usage() const; + void ch_roll_pb_block() override { // We have two choices here. // 1. Use a PBlock pool and fetch an available PBlock if we need one. In this way, we can diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 7a4bb4980b1..26b7dcbaf06 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -68,9 +68,10 @@ public: RuntimeProfile::Counter* write_block_counter, RuntimeProfile::Counter* write_bytes_counter, RuntimeProfile::Counter* write_timer, - RuntimeProfile::Counter* wait_io_timer) { + RuntimeProfile::Counter* wait_io_timer, + RuntimeProfile::Counter* memory_used_counter) { writer_->set_counters(serialize_timer, write_block_counter, write_bytes_counter, - write_timer); + write_timer, memory_used_counter); write_wait_io_timer_ = wait_io_timer; } diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 46a97285802..9fbd81601b6 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -84,6 +84,9 @@ Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& writt } }); + auto tmp_blcok_mem = tmp_block.allocated_bytes(); + memory_used_counter_->update(tmp_blcok_mem); + Defer defer {[&]() { memory_used_counter_->update(-tmp_blcok_mem); }}; RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes)); row_idx += block_rows; @@ -107,10 +110,14 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { &compressed_bytes, segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio RETURN_IF_ERROR(status); + auto pblock_mem = pblock.ByteSizeLong(); + memory_used_counter_->update(pblock_mem); + Defer defer {[&]() { memory_used_counter_->update(-pblock_mem); }}; if (!pblock.SerializeToString(&buff)) { return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( "serialize spill data error. [path={}]", file_path_); } + memory_used_counter_->update(buff.size()); } if (data_dir_->reach_capacity_limit(buff.size())) { return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>( @@ -124,6 +131,7 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { { auto buff_size = buff.size(); Defer defer {[&]() { + memory_used_counter_->update(-buff_size); if (status.ok()) { data_dir_->update_spill_data_usage(buff_size); diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index d77bbd6908c..c3502b5d6a4 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -51,11 +51,13 @@ public: void set_counters(RuntimeProfile::Counter* serialize_timer, RuntimeProfile::Counter* write_block_counter, RuntimeProfile::Counter* write_bytes_counter, - RuntimeProfile::Counter* write_timer) { + RuntimeProfile::Counter* write_timer, + RuntimeProfile::Counter* memory_used_counter) { serialize_timer_ = serialize_timer; write_block_counter_ = write_block_counter; write_bytes_counter_ = write_bytes_counter; write_timer_ = write_timer; + memory_used_counter_ = memory_used_counter; } private: @@ -78,10 +80,11 @@ private: int64_t total_written_bytes_ = 0; std::string meta_; - RuntimeProfile::Counter* write_bytes_counter_; - RuntimeProfile::Counter* serialize_timer_; - RuntimeProfile::Counter* write_timer_; - RuntimeProfile::Counter* write_block_counter_; + RuntimeProfile::Counter* write_bytes_counter_ = nullptr; + RuntimeProfile::Counter* serialize_timer_ = nullptr; + RuntimeProfile::Counter* write_timer_ = nullptr; + RuntimeProfile::Counter* write_block_counter_ = nullptr; + RuntimeProfile::Counter* memory_used_counter_ = nullptr; }; using SpillWriterUPtr = std::unique_ptr<SpillWriter>; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org