This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0f2d0188bd9df794ba42c711c0b345ac9ba2438 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Mar 11 10:56:12 2024 +0800 [feature](pipelineX) add mem control in local exchange sink (#31982) --- be/src/vec/runtime/vdata_stream_recvr.cpp | 29 ++++++++++++++++++----------- be/src/vec/runtime/vdata_stream_recvr.h | 6 ++++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 7ee7f419fff..9708be6bde9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -98,17 +98,17 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _recvr->update_blocks_memory_usage(-block_byte_size); + update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); _record_debug_info(); - if (_block_queue.empty() && _dependency) { + if (_block_queue.empty() && _source_dependency) { if (!_is_cancelled && _num_remaining_senders > 0) { - _dependency->block(); - } - if (_local_channel_dependency) { - _local_channel_dependency->set_ready(); + _source_dependency->block(); } } + if (_local_channel_dependency) { + _local_channel_dependency->set_ready(); + } if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); @@ -124,12 +124,12 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block } void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { - if (!_dependency) { + if (!_source_dependency) { return; } const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0; if (!should_wait) { - _dependency->set_ready(); + _source_dependency->set_ready(); } } @@ -202,7 +202,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } - _recvr->update_blocks_memory_usage(block_byte_size); + update_blocks_memory_usage(block_byte_size); _data_arrival_cv.notify_one(); return Status::OK(); } @@ -249,7 +249,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { // should be done before the following logic, because the _lock will be released // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may // have been closed and resouces in _recvr are released; - _recvr->update_blocks_memory_usage(block_mem_size); + update_blocks_memory_usage(block_mem_size); if (_recvr->exceeds_limit(0)) { // yiguolei // It is too tricky here, if the running thread is bthread then the tid may be wrong. @@ -485,6 +485,13 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { } } +void VDataStreamRecvr::SenderQueue::update_blocks_memory_usage(int64_t size) { + _recvr->update_blocks_memory_usage(size); + if (_recvr->exceeds_limit(0)) { + _local_channel_dependency->block(); + } +} + void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); _blocks_memory_usage_current_value.fetch_add(size); @@ -546,7 +553,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) { _record_debug_info(); try_set_dep_ready_without_lock(); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); - _recvr->update_blocks_memory_usage(block_mem_size); + update_blocks_memory_usage(block_mem_size); _data_arrival_cv.notify_one(); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index fa13666533a..06b05d2a577 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -214,9 +214,11 @@ public: } void set_dependency(std::shared_ptr<pipeline::Dependency> dependency) { - _dependency = dependency; + _source_dependency = dependency; } + void update_blocks_memory_usage(int64_t size); + protected: friend class pipeline::ExchangeLocalState; Status _inner_get_batch_without_lock(Block* block, bool* eos); @@ -285,7 +287,7 @@ protected: std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> _pending_closures; std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> _local_closure; - std::shared_ptr<pipeline::Dependency> _dependency; + std::shared_ptr<pipeline::Dependency> _source_dependency; std::shared_ptr<pipeline::Dependency> _local_channel_dependency; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org