This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d5df3bae25 [Bug](exchange) fix dcheck fail when VDataStreamRecvr input empty block (#22992) d5df3bae25 is described below commit d5df3bae25ff30bcb3e5c35ca0f374b6e368bb6f Author: Pxl <pxl...@qq.com> AuthorDate: Wed Aug 16 10:21:19 2023 +0800 [Bug](exchange) fix dcheck fail when VDataStreamRecvr input empty block (#22992) fix dcheck fail when VDataStreamRecvr input empty block --- .../vec/runtime/shared_hash_table_controller.cpp | 3 +-- be/src/vec/runtime/vdata_stream_recvr.cpp | 22 ++++++++++++++++------ be/src/vec/runtime/vdata_stream_recvr.h | 4 +++- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 9353bdbbec..490a2cfdcd 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -57,8 +57,7 @@ bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragmen SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { std::lock_guard<std::mutex> lock(_mutex); - auto it = _shared_contexts.find(my_node_id); - if (it == _shared_contexts.cend()) { + if (!_shared_contexts.count(my_node_id)) { _shared_contexts.insert({my_node_id, std::make_shared<SharedHashTableContext>()}); } return _shared_contexts[my_node_id]; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 2e0ffaf9a3..0c76df5b9a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -159,7 +159,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows()); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); - _block_queue.emplace_back(std::move(block), block_byte_size); + bool empty = !block->rows(); + + if (!empty) { + _block_queue.emplace_back(std::move(block), block_byte_size); + } // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { MonotonicStopWatch monotonicStopWatch; @@ -169,7 +173,9 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe *done = nullptr; } _recvr->_blocks_memory_usage->add(block_byte_size); - _data_arrival_cv.notify_one(); + if (!empty) { + _data_arrival_cv.notify_one(); + } } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { @@ -205,8 +211,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows()); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); - _block_queue.emplace_back(std::move(nblock), block_mem_size); - _data_arrival_cv.notify_one(); + bool empty = !nblock->rows(); + + if (!empty) { + _block_queue.emplace_back(std::move(nblock), block_mem_size); + _data_arrival_cv.notify_one(); + } if (_recvr->exceeds_limit(block_mem_size)) { // yiguolei @@ -384,8 +394,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) { } bool VDataStreamRecvr::ready_to_read() { - for (size_t i = 0; i < _sender_queues.size(); i++) { - if (_sender_queues[i]->should_wait()) { + for (const auto& queue : _sender_queues) { + if (queue->should_wait()) { return false; } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 03bf6f9db2..eb57c57b0d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -237,7 +237,9 @@ public: } void add_block(Block* block, bool use_move) override { - if (block->rows() == 0) return; + if (block->rows() == 0) { + return; + } { std::unique_lock<std::mutex> l(_lock); if (_is_cancelled) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org