This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c0f2d0188bd9df794ba42c711c0b345ac9ba2438
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Mon Mar 11 10:56:12 2024 +0800

     [feature](pipelineX) add mem control in local exchange sink (#31982)
---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 29 ++++++++++++++++++-----------
 be/src/vec/runtime/vdata_stream_recvr.h   |  6 ++++--
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 7ee7f419fff..9708be6bde9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -98,17 +98,17 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
 
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
-    _recvr->update_blocks_memory_usage(-block_byte_size);
+    update_blocks_memory_usage(-block_byte_size);
     _block_queue.pop_front();
     _record_debug_info();
-    if (_block_queue.empty() && _dependency) {
+    if (_block_queue.empty() && _source_dependency) {
         if (!_is_cancelled && _num_remaining_senders > 0) {
-            _dependency->block();
-        }
-        if (_local_channel_dependency) {
-            _local_channel_dependency->set_ready();
+            _source_dependency->block();
         }
     }
+    if (_local_channel_dependency) {
+        _local_channel_dependency->set_ready();
+    }
 
     if (!_pending_closures.empty()) {
         auto closure_pair = _pending_closures.front();
@@ -124,12 +124,12 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
 }
 
 void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
-    if (!_dependency) {
+    if (!_source_dependency) {
         return;
     }
     const bool should_wait = !_is_cancelled && _block_queue.empty() && 
_num_remaining_senders > 0;
     if (!should_wait) {
-        _dependency->set_ready();
+        _source_dependency->set_ready();
     }
 }
 
@@ -202,7 +202,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    _recvr->update_blocks_memory_usage(block_byte_size);
+    update_blocks_memory_usage(block_byte_size);
     _data_arrival_cv.notify_one();
     return Status::OK();
 }
@@ -249,7 +249,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     // should be done before the following logic, because the _lock will be 
released
     // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, 
_recvr may
     // have been closed and resouces in _recvr are released;
-    _recvr->update_blocks_memory_usage(block_mem_size);
+    update_blocks_memory_usage(block_mem_size);
     if (_recvr->exceeds_limit(0)) {
         // yiguolei
         // It is too tricky here, if the running thread is bthread then the 
tid may be wrong.
@@ -485,6 +485,13 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
     }
 }
 
+void VDataStreamRecvr::SenderQueue::update_blocks_memory_usage(int64_t size) {
+    _recvr->update_blocks_memory_usage(size);
+    if (_recvr->exceeds_limit(0)) {
+        _local_channel_dependency->block();
+    }
+}
+
 void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
     _blocks_memory_usage->add(size);
     _blocks_memory_usage_current_value.fetch_add(size);
@@ -546,7 +553,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* 
block, bool use_move) {
         _record_debug_info();
         try_set_dep_ready_without_lock();
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
-        _recvr->update_blocks_memory_usage(block_mem_size);
+        update_blocks_memory_usage(block_mem_size);
         _data_arrival_cv.notify_one();
     }
 }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index fa13666533a..06b05d2a577 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -214,9 +214,11 @@ public:
     }
 
     void set_dependency(std::shared_ptr<pipeline::Dependency> dependency) {
-        _dependency = dependency;
+        _source_dependency = dependency;
     }
 
+    void update_blocks_memory_usage(int64_t size);
+
 protected:
     friend class pipeline::ExchangeLocalState;
     Status _inner_get_batch_without_lock(Block* block, bool* eos);
@@ -285,7 +287,7 @@ protected:
     std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> 
_pending_closures;
     std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> 
_local_closure;
 
-    std::shared_ptr<pipeline::Dependency> _dependency;
+    std::shared_ptr<pipeline::Dependency> _source_dependency;
     std::shared_ptr<pipeline::Dependency> _local_channel_dependency;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to