This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8137ce536e9847bdf9fe6a5f776d5c8308a267f8
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Sep 11 10:33:38 2023 +0800

    [fix](local exchange) fix bug of accessing released counter of local data 
stream receiver (#24160)
---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 12 ++++++++----
 be/src/vec/runtime/vdata_stream_recvr.h   | 12 ++++++++++--
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 2e0ffaf9a3..fcd7d014c6 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -92,7 +92,7 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
 
     DCHECK(!_block_queue.empty());
     auto [next_block, block_byte_size] = std::move(_block_queue.front());
-    _recvr->_blocks_memory_usage->add(-block_byte_size);
+    _recvr->update_blocks_memory_usage(-block_byte_size);
     _block_queue.pop_front();
 
     if (!_pending_closures.empty()) {
@@ -168,7 +168,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    _recvr->_blocks_memory_usage->add(block_byte_size);
+    _recvr->update_blocks_memory_usage(block_byte_size);
     _data_arrival_cv.notify_one();
 }
 
@@ -208,7 +208,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
     _block_queue.emplace_back(std::move(nblock), block_mem_size);
     _data_arrival_cv.notify_one();
 
-    if (_recvr->exceeds_limit(block_mem_size)) {
+    // Careful: Accessing members of _recvr that are allocated by Object pool
+    // should be done before the following logic, because the _lock will be 
released
+    // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, 
_recvr may
+    // have been closed and resouces in _recvr are released.
+    _recvr->update_blocks_memory_usage(block_mem_size);
+    if (_recvr->exceeds_limit(0)) {
         // yiguolei
         // It is too tricky here, if the running thread is bthread then the 
tid may be wrong.
         std::thread::id tid = std::this_thread::get_id();
@@ -223,7 +228,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         iter->second->wait(l);
     }
 
-    _recvr->_blocks_memory_usage->add(block_mem_size);
 }
 
 void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 03bf6f9db2..0059c8ddf0 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -102,14 +102,21 @@ public:
 
     void close();
 
+    // Careful: stream sender will call this function for a local receiver,
+    // accessing members of receiver that are allocated by Object pool
+    // in this function is not safe.
     bool exceeds_limit(int batch_size) {
-        return _blocks_memory_usage->current_value() + batch_size >
+        return _blocks_memory_usage_current_value + batch_size >
                config::exchg_node_buffer_size_bytes;
     }
 
     bool is_closed() const { return _is_closed; }
 
 private:
+    void update_blocks_memory_usage(int64_t size) {
+        _blocks_memory_usage->add(size);
+        _blocks_memory_usage_current_value = 
_blocks_memory_usage->current_value();
+    }
     class SenderQueue;
     class PipSenderQueue;
 
@@ -154,6 +161,7 @@ private:
     RuntimeProfile::Counter* _decompress_bytes;
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+    std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
     RuntimeProfile::Counter* _peak_memory_usage_counter;
 
     // Number of rows received
@@ -266,7 +274,7 @@ public:
             }
             _block_queue.emplace_back(std::move(nblock), block_mem_size);
             COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_mem_size);
-            _recvr->_blocks_memory_usage->add(block_mem_size);
+            _recvr->update_blocks_memory_usage(block_mem_size);
             _data_arrival_cv.notify_one();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to