HappenLee commented on code in PR #44378: URL: https://github.com/apache/doris/pull/44378#discussion_r1855738442
########## be/src/vec/runtime/vdata_stream_recvr.cpp: ########## @@ -83,21 +82,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { } Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { - if (_is_cancelled) { - RETURN_IF_ERROR(_cancel_status); - return Status::Cancelled("Cancelled"); - } + BlockItem block_item; + { + std::lock_guard<std::mutex> l(_lock); + //check and get block_item from data_queue + if (_is_cancelled) { + RETURN_IF_ERROR(_cancel_status); + return Status::Cancelled("Cancelled"); + } - if (_block_queue.empty()) { - DCHECK_EQ(_num_remaining_senders, 0); - *eos = true; - return Status::OK(); - } + if (_block_queue.empty()) { + DCHECK_EQ(_num_remaining_senders, 0); + *eos = true; + return Status::OK(); + } - DCHECK(!_block_queue.empty()); - auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _block_queue.pop_front(); + DCHECK(!_block_queue.empty()); + block_item = std::move(_block_queue.front()); + _block_queue.pop_front(); + } + BlockUPtr next_block; + RETURN_IF_ERROR(block_item.get_block(next_block)); + size_t block_byte_size = block_item.block_byte_size(); + COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time()); + COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); + COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); + std::lock_guard<std::mutex> l(_lock); Review Comment: rethink the logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org