This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 04e993c1de8 [refine](pipeline) refine some VDataStreamRecvr code  
(#35063) (#37802)
04e993c1de8 is described below

commit 04e993c1de8802a3dbea44710e399c04b6aae5ff
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Thu Aug 22 19:55:17 2024 +0800

    [refine](pipeline) refine some VDataStreamRecvr code  (#35063) (#37802)
    
    ## Proposed changes
    https://github.com/apache/doris/pull/35063
    https://github.com/apache/doris/pull/35428
---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 60 ++++++++++++++++++-------------
 be/src/vec/runtime/vdata_stream_recvr.h   | 20 +++++------
 2 files changed, 46 insertions(+), 34 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index cb483e986c8..912ecf53989 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -49,6 +49,7 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* 
parent_recvr, int n
           _num_remaining_senders(num_senders),
           _received_first_batch(false) {
     _cancel_status = Status::OK();
+    _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem 
tracker");
 }
 
 VDataStreamRecvr::SenderQueue::~SenderQueue() {
@@ -98,17 +99,14 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
 
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
-    update_blocks_memory_usage(-block_byte_size);
     _block_queue.pop_front();
+    sub_blocks_memory_usage(block_byte_size);
     _record_debug_info();
     if (_block_queue.empty() && _source_dependency) {
         if (!_is_cancelled && _num_remaining_senders > 0) {
             _source_dependency->block();
         }
     }
-    if (_local_channel_dependency) {
-        _local_channel_dependency->set_ready();
-    }
 
     if (!_pending_closures.empty()) {
         auto closure_pair = _pending_closures.front();
@@ -136,9 +134,6 @@ void 
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
 Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
                                                 int64_t packet_seq,
                                                 ::google::protobuf::Closure** 
done) {
-    const auto pblock_byte_size = pblock.ByteSizeLong();
-    COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
-
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_is_cancelled) {
@@ -191,6 +186,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
     _block_queue.emplace_back(std::move(block), block_byte_size);
+    COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
     _record_debug_info();
     try_set_dep_ready_without_lock();
 
@@ -202,7 +198,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    update_blocks_memory_usage(block_byte_size);
+    add_blocks_memory_usage(block_byte_size);
     _data_arrival_cv.notify_one();
     return Status::OK();
 }
@@ -216,7 +212,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         }
     }
 
-    auto block_bytes_received = block->bytes();
     // Has to use unique ptr here, because clone column may failed if allocate 
memory failed.
     BlockUPtr nblock = 
Block::create_unique(block->get_columns_with_type_and_name());
 
@@ -236,11 +231,11 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
     if (_is_cancelled) {
         return;
     }
-    COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_bytes_received);
     COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
     _block_queue.emplace_back(std::move(nblock), block_mem_size);
+    COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
     _record_debug_info();
     try_set_dep_ready_without_lock();
     _data_arrival_cv.notify_one();
@@ -249,7 +244,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     // should be done before the following logic, because the _lock will be 
released
     // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, 
_recvr may
     // have been closed and resouces in _recvr are released;
-    update_blocks_memory_usage(block_mem_size);
+    add_blocks_memory_usage(block_mem_size);
     if (_recvr->exceeds_limit(0)) {
         // yiguolei
         // It is too tricky here, if the running thread is bthread then the 
tid may be wrong.
@@ -347,7 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
           _is_closed(false),
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
-          _enable_pipeline(state->enable_pipeline_exec()) {
+          _enable_pipeline(state->enable_pipeline_x_exec()) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
@@ -364,6 +359,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
     }
     _sender_queues.reserve(num_queues);
     int num_sender_per_queue = is_merging ? 1 : num_senders;
+    _sender_queue_mem_limit = std::max(20480, 
config::exchg_node_buffer_size_bytes / num_queues);
     for (int i = 0; i < num_queues; ++i) {
         SenderQueue* queue = nullptr;
         if (_enable_pipeline) {
@@ -379,10 +375,9 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
 
     // Initialize the counters
     _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
-    _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", 
TUnit::BYTES, "MemoryUsage");
     _peak_memory_usage_counter =
-            _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
-    _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", 
TUnit::BYTES);
+            _profile->add_counter("PeakMemoryUsage", TUnit::BYTES, 
"MemoryUsage");
+    _remote_bytes_received_counter = ADD_COUNTER(_profile, 
"RemoteBytesReceived", TUnit::BYTES);
     _local_bytes_received_counter = ADD_COUNTER(_profile, 
"LocalBytesReceived", TUnit::BYTES);
 
     _deserialize_row_batch_timer = ADD_TIMER(_profile, 
"DeserializeRowBatchTimer");
@@ -428,7 +423,6 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, 
int sender_id, int be_n
 }
 
 void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
-    _mem_tracker->consume(block->allocated_bytes());
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(block, use_move);
 }
@@ -455,7 +449,6 @@ bool VDataStreamRecvr::ready_to_read() {
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
     _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
-    Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes()); 
});
     if (!_is_merging) {
         block->clear();
         return _sender_queues[0]->get_batch(block, eos);
@@ -482,16 +475,35 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
     }
 }
 
-void VDataStreamRecvr::SenderQueue::update_blocks_memory_usage(int64_t size) {
-    _recvr->update_blocks_memory_usage(size);
-    if (_local_channel_dependency && _recvr->exceeds_limit(0)) {
+void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) {
+    DCHECK(size >= 0);
+    _recvr->_mem_tracker->consume(size);
+    _queue_mem_tracker->consume(size);
+    if (_local_channel_dependency && exceeds_limit()) {
         _local_channel_dependency->block();
     }
 }
 
-void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
-    _blocks_memory_usage->add(size);
-    _blocks_memory_usage_current_value.fetch_add(size);
+void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) {
+    DCHECK(size >= 0);
+    _recvr->_mem_tracker->release(size);
+    _queue_mem_tracker->release(size);
+    if (_local_channel_dependency && (!exceeds_limit())) {
+        _local_channel_dependency->set_ready();
+    }
+}
+
+bool VDataStreamRecvr::SenderQueue::exceeds_limit() {
+    const size_t queue_byte_size = _queue_mem_tracker->consumption();
+    return _recvr->queue_exceeds_limit(queue_byte_size);
+}
+
+bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) {
+    return _mem_tracker->consumption() + block_byte_size > 
config::exchg_node_buffer_size_bytes;
+}
+
+bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const {
+    return queue_byte_size >= _sender_queue_mem_limit;
 }
 
 void VDataStreamRecvr::close() {
@@ -550,7 +562,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* 
block, bool use_move) {
         _record_debug_info();
         try_set_dep_ready_without_lock();
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
-        update_blocks_memory_usage(block_mem_size);
+        add_blocks_memory_usage(block_mem_size);
         _data_arrival_cv.notify_one();
     }
 }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index cb44565e8c2..d447e5686e9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -111,17 +111,13 @@ public:
     // Careful: stream sender will call this function for a local receiver,
     // accessing members of receiver that are allocated by Object pool
     // in this function is not safe.
-    bool exceeds_limit(int batch_size) {
-        return _blocks_memory_usage_current_value + batch_size >
-               config::exchg_node_buffer_size_bytes;
-    }
-
+    bool exceeds_limit(size_t block_byte_size);
+    bool queue_exceeds_limit(size_t byte_size) const;
     bool is_closed() const { return _is_closed; }
 
     std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(int 
sender_id);
 
 private:
-    void update_blocks_memory_usage(int64_t size);
     class PipSenderQueue;
 
     friend struct BlockSupplierSortCursorImpl;
@@ -146,13 +142,14 @@ private:
     std::unique_ptr<MemTracker> _mem_tracker;
     // Managed by object pool
     std::vector<SenderQueue*> _sender_queues;
+    size_t _sender_queue_mem_limit;
 
     std::unique_ptr<VSortedRunMerger> _merger;
 
     ObjectPool _sender_queue_pool;
     RuntimeProfile* _profile = nullptr;
 
-    RuntimeProfile::Counter* _bytes_received_counter = nullptr;
+    RuntimeProfile::Counter* _remote_bytes_received_counter = nullptr;
     RuntimeProfile::Counter* _local_bytes_received_counter = nullptr;
     RuntimeProfile::Counter* _deserialize_row_batch_timer = nullptr;
     RuntimeProfile::Counter* _first_batch_wait_total_timer = nullptr;
@@ -161,8 +158,6 @@ private:
     RuntimeProfile::Counter* _decompress_timer = nullptr;
     RuntimeProfile::Counter* _decompress_bytes = nullptr;
     RuntimeProfile::Counter* _memory_usage_counter = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
-    std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
     RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
 
     // Number of rows received
@@ -222,7 +217,11 @@ public:
         _source_dependency = dependency;
     }
 
-    void update_blocks_memory_usage(int64_t size);
+    void add_blocks_memory_usage(int64_t size);
+
+    void sub_blocks_memory_usage(int64_t size);
+
+    bool exceeds_limit();
 
 protected:
     friend class pipeline::ExchangeLocalState;
@@ -282,6 +281,7 @@ protected:
     int _num_remaining_senders;
     std::condition_variable _data_arrival_cv;
     std::condition_variable _data_removal_cv;
+    std::unique_ptr<MemTracker> _queue_mem_tracker;
     std::list<std::pair<BlockUPtr, size_t>> _block_queue;
 
     bool _received_first_batch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to