This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 04e993c1de8 [refine](pipeline) refine some VDataStreamRecvr code (#35063) (#37802) 04e993c1de8 is described below commit 04e993c1de8802a3dbea44710e399c04b6aae5ff Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Thu Aug 22 19:55:17 2024 +0800 [refine](pipeline) refine some VDataStreamRecvr code (#35063) (#37802) ## Proposed changes https://github.com/apache/doris/pull/35063 https://github.com/apache/doris/pull/35428 --- be/src/vec/runtime/vdata_stream_recvr.cpp | 60 ++++++++++++++++++------------- be/src/vec/runtime/vdata_stream_recvr.h | 20 +++++------ 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index cb483e986c8..912ecf53989 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -49,6 +49,7 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int n _num_remaining_senders(num_senders), _received_first_batch(false) { _cancel_status = Status::OK(); + _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem tracker"); } VDataStreamRecvr::SenderQueue::~SenderQueue() { @@ -98,17 +99,14 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); - update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); + sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { if (!_is_cancelled && _num_remaining_senders > 0) { _source_dependency->block(); } } - if (_local_channel_dependency) { - _local_channel_dependency->set_ready(); - } if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); @@ -136,9 +134,6 @@ void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - const auto pblock_byte_size = pblock.ByteSizeLong(); - COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size); - { std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { @@ -191,6 +186,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); _block_queue.emplace_back(std::move(block), block_byte_size); + COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); _record_debug_info(); try_set_dep_ready_without_lock(); @@ -202,7 +198,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } - update_blocks_memory_usage(block_byte_size); + add_blocks_memory_usage(block_byte_size); _data_arrival_cv.notify_one(); return Status::OK(); } @@ -216,7 +212,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } } - auto block_bytes_received = block->bytes(); // Has to use unique ptr here, because clone column may failed if allocate memory failed. BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); @@ -236,11 +231,11 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (_is_cancelled) { return; } - COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received); COUNTER_UPDATE(_recvr->_rows_produced_counter, rows); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); _block_queue.emplace_back(std::move(nblock), block_mem_size); + COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _record_debug_info(); try_set_dep_ready_without_lock(); _data_arrival_cv.notify_one(); @@ -249,7 +244,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { // should be done before the following logic, because the _lock will be released // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may // have been closed and resouces in _recvr are released; - update_blocks_memory_usage(block_mem_size); + add_blocks_memory_usage(block_mem_size); if (_recvr->exceeds_limit(0)) { // yiguolei // It is too tricky here, if the running thread is bthread then the tid may be wrong. @@ -347,7 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta _is_closed(false), _profile(profile), _peak_memory_usage_counter(nullptr), - _enable_pipeline(state->enable_pipeline_exec()) { + _enable_pipeline(state->enable_pipeline_x_exec()) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id)); @@ -364,6 +359,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta } _sender_queues.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; + _sender_queue_mem_limit = std::max(20480, config::exchg_node_buffer_size_bytes / num_queues); for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = nullptr; if (_enable_pipeline) { @@ -379,10 +375,9 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta // Initialize the counters _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); - _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage"); _peak_memory_usage_counter = - _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); - _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES); + _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); + _remote_bytes_received_counter = ADD_COUNTER(_profile, "RemoteBytesReceived", TUnit::BYTES); _local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES); _deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer"); @@ -428,7 +423,6 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_n } void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { - _mem_tracker->consume(block->allocated_bytes()); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->add_block(block, use_move); } @@ -455,7 +449,6 @@ bool VDataStreamRecvr::ready_to_read() { Status VDataStreamRecvr::get_next(Block* block, bool* eos) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); - Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes()); }); if (!_is_merging) { block->clear(); return _sender_queues[0]->get_batch(block, eos); @@ -482,16 +475,35 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { } } -void VDataStreamRecvr::SenderQueue::update_blocks_memory_usage(int64_t size) { - _recvr->update_blocks_memory_usage(size); - if (_local_channel_dependency && _recvr->exceeds_limit(0)) { +void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) { + DCHECK(size >= 0); + _recvr->_mem_tracker->consume(size); + _queue_mem_tracker->consume(size); + if (_local_channel_dependency && exceeds_limit()) { _local_channel_dependency->block(); } } -void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { - _blocks_memory_usage->add(size); - _blocks_memory_usage_current_value.fetch_add(size); +void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) { + DCHECK(size >= 0); + _recvr->_mem_tracker->release(size); + _queue_mem_tracker->release(size); + if (_local_channel_dependency && (!exceeds_limit())) { + _local_channel_dependency->set_ready(); + } +} + +bool VDataStreamRecvr::SenderQueue::exceeds_limit() { + const size_t queue_byte_size = _queue_mem_tracker->consumption(); + return _recvr->queue_exceeds_limit(queue_byte_size); +} + +bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) { + return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes; +} + +bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const { + return queue_byte_size >= _sender_queue_mem_limit; } void VDataStreamRecvr::close() { @@ -550,7 +562,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); - update_blocks_memory_usage(block_mem_size); + add_blocks_memory_usage(block_mem_size); _data_arrival_cv.notify_one(); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index cb44565e8c2..d447e5686e9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -111,17 +111,13 @@ public: // Careful: stream sender will call this function for a local receiver, // accessing members of receiver that are allocated by Object pool // in this function is not safe. - bool exceeds_limit(int batch_size) { - return _blocks_memory_usage_current_value + batch_size > - config::exchg_node_buffer_size_bytes; - } - + bool exceeds_limit(size_t block_byte_size); + bool queue_exceeds_limit(size_t byte_size) const; bool is_closed() const { return _is_closed; } std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(int sender_id); private: - void update_blocks_memory_usage(int64_t size); class PipSenderQueue; friend struct BlockSupplierSortCursorImpl; @@ -146,13 +142,14 @@ private: std::unique_ptr<MemTracker> _mem_tracker; // Managed by object pool std::vector<SenderQueue*> _sender_queues; + size_t _sender_queue_mem_limit; std::unique_ptr<VSortedRunMerger> _merger; ObjectPool _sender_queue_pool; RuntimeProfile* _profile = nullptr; - RuntimeProfile::Counter* _bytes_received_counter = nullptr; + RuntimeProfile::Counter* _remote_bytes_received_counter = nullptr; RuntimeProfile::Counter* _local_bytes_received_counter = nullptr; RuntimeProfile::Counter* _deserialize_row_batch_timer = nullptr; RuntimeProfile::Counter* _first_batch_wait_total_timer = nullptr; @@ -161,8 +158,6 @@ private: RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; RuntimeProfile::Counter* _memory_usage_counter = nullptr; - RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; - std::atomic<int64_t> _blocks_memory_usage_current_value = 0; RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; // Number of rows received @@ -222,7 +217,11 @@ public: _source_dependency = dependency; } - void update_blocks_memory_usage(int64_t size); + void add_blocks_memory_usage(int64_t size); + + void sub_blocks_memory_usage(int64_t size); + + bool exceeds_limit(); protected: friend class pipeline::ExchangeLocalState; @@ -282,6 +281,7 @@ protected: int _num_remaining_senders; std::condition_variable _data_arrival_cv; std::condition_variable _data_removal_cv; + std::unique_ptr<MemTracker> _queue_mem_tracker; std::list<std::pair<BlockUPtr, size_t>> _block_queue; bool _received_first_batch; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org