This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0896aefce3 [fix](local exchange) fix bug of accesssing released counter of local data stream receiver (#24148) 0896aefce3 is described below commit 0896aefce37383856c44789f8aad2711b125e284 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Sep 11 09:52:31 2023 +0800 [fix](local exchange) fix bug of accesssing released counter of local data stream receiver (#24148) --- be/src/vec/runtime/vdata_stream_recvr.cpp | 13 ++++++++----- be/src/vec/runtime/vdata_stream_recvr.h | 12 ++++++++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index cc908d47e0..30588538d7 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -92,7 +92,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block DCHECK(!_block_queue.empty()); auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _recvr->_blocks_memory_usage->add(-block_byte_size); + _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); if (!_pending_closures.empty()) { @@ -173,7 +173,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } - _recvr->_blocks_memory_usage->add(block_byte_size); + _recvr->update_blocks_memory_usage(block_byte_size); if (!empty) { _data_arrival_cv.notify_one(); } @@ -220,7 +220,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _data_arrival_cv.notify_one(); } - if (_recvr->exceeds_limit(block_mem_size)) { + // Careful: Accessing members of _recvr that are allocated by Object pool + // should be done before the following logic, because the _lock will be released + // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns, _recvr may + // have been closed and resouces in _recvr are released; + _recvr->update_blocks_memory_usage(block_mem_size); + if (_recvr->exceeds_limit(0)) { // yiguolei // It is too tricky here, if the running thread is bthread then the tid may be wrong. std::thread::id tid = std::this_thread::get_id(); @@ -234,8 +239,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { _pending_closures.emplace_back(iter->second.get(), monotonicStopWatch); iter->second->wait(l); } - - _recvr->_blocks_memory_usage->add(block_mem_size); } void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index fe9910492b..d79e9ed90a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -102,14 +102,21 @@ public: void close(); + // Careful: stream sender will call this function for a local receiver, + // accessing members of receiver that are allocated by Object pool + // in this function is not safe. bool exceeds_limit(int batch_size) { - return _blocks_memory_usage->current_value() + batch_size > + return _blocks_memory_usage_current_value + batch_size > config::exchg_node_buffer_size_bytes; } bool is_closed() const { return _is_closed; } private: + void update_blocks_memory_usage(int64_t size) { + _blocks_memory_usage->add(size); + _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); + } class SenderQueue; class PipSenderQueue; @@ -154,6 +161,7 @@ private: RuntimeProfile::Counter* _decompress_bytes; RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage; + std::atomic<int64_t> _blocks_memory_usage_current_value = 0; RuntimeProfile::Counter* _peak_memory_usage_counter; // Number of rows received @@ -268,7 +276,7 @@ public: } _block_queue.emplace_back(std::move(nblock), block_mem_size); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); - _recvr->_blocks_memory_usage->add(block_mem_size); + _recvr->update_blocks_memory_usage(block_mem_size); _data_arrival_cv.notify_one(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org