This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 378b2f21e22 [opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue (#44378) 378b2f21e22 is described below commit 378b2f21e22b11e8b912b077f2032e63c1ca0c81 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Tue Nov 26 15:41:38 2024 +0800 [opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue (#44378) ### What problem does this PR solve? Previously, for a `pblock` (serialized block), the block would be deserialized immediately after receiving the RPC request and then placed into the `data_queue`. This approach caused significant time consumption during RPC processing due to the deserialization process, impacting overall performance. The new approach defers deserialization until `getBlock` is called. This has the following advantages: 1. Reduces time spent during the RPC handling phase. 2. Memory allocation for deserialization happens within the execution thread, improving cache locality and reducing contention on memory resources. ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [x] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/vec/runtime/vdata_stream_mgr.cpp | 11 +++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 68 +++++++++++++------------------ be/src/vec/runtime/vdata_stream_recvr.h | 49 ++++++++++++++++++---- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 382a6d0e6e3..7dad3d2c867 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -18,10 +18,12 @@ #include "vec/runtime/vdata_stream_mgr.h" #include <gen_cpp/Types_types.h> +#include <gen_cpp/data.pb.h> #include <gen_cpp/internal_service.pb.h> #include <gen_cpp/types.pb.h> #include <stddef.h> +#include <memory> #include <ostream> #include <string> #include <vector> @@ -141,9 +143,12 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, bool eos = request->eos(); if (request->has_block()) { - RETURN_IF_ERROR(recvr->add_block( - request->block(), request->sender_id(), request->be_number(), request->packet_seq(), - eos ? nullptr : done, wait_for_worker, cpu_time_stop_watch.elapsed_time())); + std::unique_ptr<PBlock> pblock_ptr { + const_cast<PTransmitDataParams*>(request)->release_block()}; + RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), request->sender_id(), + request->be_number(), request->packet_seq(), + eos ? nullptr : done, wait_for_worker, + cpu_time_stop_watch.elapsed_time())); } if (eos) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b48b9f780b8..81e4e1cd5f0 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -69,7 +69,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() { } Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { - std::lock_guard<std::mutex> l(_lock); // protect _block_queue #ifndef NDEBUG if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, @@ -79,25 +78,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { _debug_string_info()); } #endif - return _inner_get_batch_without_lock(block, eos); -} + BlockItem block_item; + { + std::lock_guard<std::mutex> l(_lock); + //check and get block_item from data_queue + if (_is_cancelled) { + RETURN_IF_ERROR(_cancel_status); + return Status::Cancelled("Cancelled"); + } -Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { - if (_is_cancelled) { - RETURN_IF_ERROR(_cancel_status); - return Status::Cancelled("Cancelled"); - } + if (_block_queue.empty()) { + DCHECK_EQ(_num_remaining_senders, 0); + *eos = true; + return Status::OK(); + } - if (_block_queue.empty()) { - DCHECK_EQ(_num_remaining_senders, 0); - *eos = true; - return Status::OK(); + DCHECK(!_block_queue.empty()); + block_item = std::move(_block_queue.front()); + _block_queue.pop_front(); } - - DCHECK(!_block_queue.empty()); - auto [next_block, block_byte_size] = std::move(_block_queue.front()); - _block_queue.pop_front(); + BlockUPtr next_block; + RETURN_IF_ERROR(block_item.get_block(next_block)); + size_t block_byte_size = block_item.block_byte_size(); + COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time()); + COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); + COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size); + std::lock_guard<std::mutex> l(_lock); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -133,7 +140,7 @@ void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { } } -Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, +Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, @@ -163,30 +170,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num } } - BlockUPtr block = nullptr; - int64_t deserialize_time = 0; - { - SCOPED_RAW_TIMER(&deserialize_time); - block = Block::create_unique(); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock)); - } - - const auto rows = block->rows(); - if (rows == 0) { - return Status::OK(); - } - auto block_byte_size = block->allocated_bytes(); - VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n"; - std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { return Status::OK(); } - COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time); - COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); - COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); - COUNTER_UPDATE(_recvr->_rows_produced_counter, rows); + const auto block_byte_size = pblock->ByteSizeLong(); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); if (_recvr->_max_wait_worker_time->value() < wait_for_worker) { _recvr->_max_wait_worker_time->set(wait_for_worker); @@ -196,7 +185,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr); } - _block_queue.emplace_back(std::move(block), block_byte_size); + _block_queue.emplace_back(std::move(pblock), block_byte_size); COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); _record_debug_info(); try_set_dep_ready_without_lock(); @@ -370,7 +359,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime"); _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); - _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", TUnit::UNIT); _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT); _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT); _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT); @@ -401,13 +389,13 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, return Status::OK(); } -Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, +Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr) { SCOPED_ATTACH_TASK(_query_thread_context); int use_sender_id = _is_merging ? sender_id : 0; - return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done, + return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done, wait_for_worker, time_to_find_recvr); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 08fb004f3b1..1639366c8b8 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -18,6 +18,7 @@ #pragma once #include <gen_cpp/Types_types.h> +#include <gen_cpp/data.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> @@ -84,9 +85,9 @@ public: std::vector<SenderQueue*> sender_queues() const { return _sender_queues; } - Status add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done, const int64_t wait_for_worker, - const uint64_t time_to_find_recvr); + Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number, + int64_t packet_seq, ::google::protobuf::Closure** done, + const int64_t wait_for_worker, const uint64_t time_to_find_recvr); void add_block(Block* block, int sender_id, bool use_move); @@ -157,8 +158,6 @@ private: RuntimeProfile::Counter* _decompress_timer = nullptr; RuntimeProfile::Counter* _decompress_bytes = nullptr; - // Number of rows received - RuntimeProfile::Counter* _rows_produced_counter = nullptr; // Number of blocks received RuntimeProfile::Counter* _blocks_produced_counter = nullptr; RuntimeProfile::Counter* _max_wait_worker_time = nullptr; @@ -181,7 +180,7 @@ public: Status get_batch(Block* next_block, bool* eos); - Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq, + Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); @@ -205,8 +204,6 @@ public: protected: friend class pipeline::ExchangeLocalState; - Status _inner_get_batch_without_lock(Block* block, bool* eos); - void try_set_dep_ready_without_lock(); // To record information about several variables in the event of a DCHECK failure. @@ -260,7 +257,41 @@ protected: Status _cancel_status; int _num_remaining_senders; std::unique_ptr<MemTracker> _queue_mem_tracker; - std::list<std::pair<BlockUPtr, size_t>> _block_queue; + + // `BlockItem` is used in `_block_queue` to handle both local and remote exchange blocks. + // For local exchange blocks, `BlockUPtr` is used directly without any modification. + // For remote exchange blocks, the `pblock` is stored in `BlockItem`. + // When `getBlock` is called, the `pblock` is deserialized into a usable block. + struct BlockItem { + Status get_block(BlockUPtr& block) { + if (!_block) { + DCHECK(_pblock); + SCOPED_RAW_TIMER(&_deserialize_time); + _block = Block::create_unique(); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock)); + } + block.swap(_block); + _block.reset(); + return Status::OK(); + } + + size_t block_byte_size() const { return _block_byte_size; } + int64_t deserialize_time() const { return _deserialize_time; } + BlockItem() = default; + BlockItem(BlockUPtr&& block, size_t block_byte_size) + : _block(std::move(block)), _block_byte_size(block_byte_size) {} + + BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size) + : _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {} + + private: + BlockUPtr _block; + std::unique_ptr<PBlock> _pblock; + size_t _block_byte_size = 0; + int64_t _deserialize_time = 0; + }; + + std::list<BlockItem> _block_queue; // sender_id std::unordered_set<int> _sender_eos_set; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org