This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0896aefce3 [fix](local exchange) fix bug of accesssing released 
counter of local data stream receiver (#24148)
0896aefce3 is described below

commit 0896aefce37383856c44789f8aad2711b125e284
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Sep 11 09:52:31 2023 +0800

    [fix](local exchange) fix bug of accesssing released counter of local data 
stream receiver (#24148)
---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 13 ++++++++-----
 be/src/vec/runtime/vdata_stream_recvr.h   | 12 ++++++++++--
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index cc908d47e0..30588538d7 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -92,7 +92,7 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
 
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
-    _recvr->_blocks_memory_usage->add(-block_byte_size);
+    _recvr->update_blocks_memory_usage(-block_byte_size);
     _block_queue.pop_front();
 
     if (!_pending_closures.empty()) {
@@ -173,7 +173,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    _recvr->_blocks_memory_usage->add(block_byte_size);
+    _recvr->update_blocks_memory_usage(block_byte_size);
     if (!empty) {
         _data_arrival_cv.notify_one();
     }
@@ -220,7 +220,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
         _data_arrival_cv.notify_one();
     }
 
-    if (_recvr->exceeds_limit(block_mem_size)) {
+    // Careful: Accessing members of _recvr that are allocated by Object pool
+    // should be done before the following logic, because the _lock will be 
released
+    // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, 
_recvr may
+    // have been closed and resouces in _recvr are released;
+    _recvr->update_blocks_memory_usage(block_mem_size);
+    if (_recvr->exceeds_limit(0)) {
         // yiguolei
         // It is too tricky here, if the running thread is bthread then the 
tid may be wrong.
         std::thread::id tid = std::this_thread::get_id();
@@ -234,8 +239,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         _pending_closures.emplace_back(iter->second.get(), monotonicStopWatch);
         iter->second->wait(l);
     }
-
-    _recvr->_blocks_memory_usage->add(block_mem_size);
 }
 
 void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index fe9910492b..d79e9ed90a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -102,14 +102,21 @@ public:
 
     void close();
 
+    // Careful: stream sender will call this function for a local receiver,
+    // accessing members of receiver that are allocated by Object pool
+    // in this function is not safe.
     bool exceeds_limit(int batch_size) {
-        return _blocks_memory_usage->current_value() + batch_size >
+        return _blocks_memory_usage_current_value + batch_size >
                config::exchg_node_buffer_size_bytes;
     }
 
     bool is_closed() const { return _is_closed; }
 
 private:
+    void update_blocks_memory_usage(int64_t size) {
+        _blocks_memory_usage->add(size);
+        _blocks_memory_usage_current_value = 
_blocks_memory_usage->current_value();
+    }
     class SenderQueue;
     class PipSenderQueue;
 
@@ -154,6 +161,7 @@ private:
     RuntimeProfile::Counter* _decompress_bytes;
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+    std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
     RuntimeProfile::Counter* _peak_memory_usage_counter;
 
     // Number of rows received
@@ -268,7 +276,7 @@ public:
             }
             _block_queue.emplace_back(std::move(nblock), block_mem_size);
             COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_mem_size);
-            _recvr->_blocks_memory_usage->add(block_mem_size);
+            _recvr->update_blocks_memory_usage(block_mem_size);
             _data_arrival_cv.notify_one();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to