This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 378b2f21e22 [opt](exec)lazy deserialize pblock in 
VDataStreamRecvr::SenderQueue  (#44378)
378b2f21e22 is described below

commit 378b2f21e22b11e8b912b077f2032e63c1ca0c81
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Tue Nov 26 15:41:38 2024 +0800

    [opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue  
(#44378)
    
    ### What problem does this PR solve?
    
    Previously, for a `pblock` (serialized block), the block would be
    deserialized immediately
    after receiving the RPC request and then placed into the `data_queue`.
    This approach caused significant time consumption during RPC processing
    due to the
    deserialization process, impacting overall performance.
    The new approach defers deserialization until `getBlock` is called. This
    has the following advantages:
    1. Reduces time spent during the RPC handling phase.
    2. Memory allocation for deserialization happens within the execution
    thread, improving cache locality
       and reducing contention on memory resources.
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [x] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/vec/runtime/vdata_stream_mgr.cpp   | 11 +++--
 be/src/vec/runtime/vdata_stream_recvr.cpp | 68 +++++++++++++------------------
 be/src/vec/runtime/vdata_stream_recvr.h   | 49 ++++++++++++++++++----
 3 files changed, 76 insertions(+), 52 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 382a6d0e6e3..7dad3d2c867 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -18,10 +18,12 @@
 #include "vec/runtime/vdata_stream_mgr.h"
 
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
 #include <stddef.h>
 
+#include <memory>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -141,9 +143,12 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
 
     bool eos = request->eos();
     if (request->has_block()) {
-        RETURN_IF_ERROR(recvr->add_block(
-                request->block(), request->sender_id(), request->be_number(), 
request->packet_seq(),
-                eos ? nullptr : done, wait_for_worker, 
cpu_time_stop_watch.elapsed_time()));
+        std::unique_ptr<PBlock> pblock_ptr {
+                const_cast<PTransmitDataParams*>(request)->release_block()};
+        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), 
request->sender_id(),
+                                         request->be_number(), 
request->packet_seq(),
+                                         eos ? nullptr : done, wait_for_worker,
+                                         cpu_time_stop_watch.elapsed_time()));
     }
 
     if (eos) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index b48b9f780b8..81e4e1cd5f0 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -69,7 +69,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() {
 }
 
 Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
-    std::lock_guard<std::mutex> l(_lock); // protect _block_queue
 #ifndef NDEBUG
     if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
         throw doris::Exception(ErrorCode::INTERNAL_ERROR,
@@ -79,25 +78,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* 
block, bool* eos) {
                                _debug_string_info());
     }
 #endif
-    return _inner_get_batch_without_lock(block, eos);
-}
+    BlockItem block_item;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        //check and get block_item from data_queue
+        if (_is_cancelled) {
+            RETURN_IF_ERROR(_cancel_status);
+            return Status::Cancelled("Cancelled");
+        }
 
-Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* 
block, bool* eos) {
-    if (_is_cancelled) {
-        RETURN_IF_ERROR(_cancel_status);
-        return Status::Cancelled("Cancelled");
-    }
+        if (_block_queue.empty()) {
+            DCHECK_EQ(_num_remaining_senders, 0);
+            *eos = true;
+            return Status::OK();
+        }
 
-    if (_block_queue.empty()) {
-        DCHECK_EQ(_num_remaining_senders, 0);
-        *eos = true;
-        return Status::OK();
+        DCHECK(!_block_queue.empty());
+        block_item = std::move(_block_queue.front());
+        _block_queue.pop_front();
     }
-
-    DCHECK(!_block_queue.empty());
-    auto [next_block, block_byte_size] = std::move(_block_queue.front());
-    _block_queue.pop_front();
+    BlockUPtr next_block;
+    RETURN_IF_ERROR(block_item.get_block(next_block));
+    size_t block_byte_size = block_item.block_byte_size();
+    COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, 
block_item.deserialize_time());
+    COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
+    COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
     _recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
+    std::lock_guard<std::mutex> l(_lock);
     sub_blocks_memory_usage(block_byte_size);
     _record_debug_info();
     if (_block_queue.empty() && _source_dependency) {
@@ -133,7 +140,7 @@ void 
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
     }
 }
 
-Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
+Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> 
pblock, int be_number,
                                                 int64_t packet_seq,
                                                 ::google::protobuf::Closure** 
done,
                                                 const int64_t wait_for_worker,
@@ -163,30 +170,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         }
     }
 
-    BlockUPtr block = nullptr;
-    int64_t deserialize_time = 0;
-    {
-        SCOPED_RAW_TIMER(&deserialize_time);
-        block = Block::create_unique();
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock));
-    }
-
-    const auto rows = block->rows();
-    if (rows == 0) {
-        return Status::OK();
-    }
-    auto block_byte_size = block->allocated_bytes();
-    VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << 
"\n";
-
     std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
         return Status::OK();
     }
 
-    COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
-    COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
-    COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
-    COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
+    const auto block_byte_size = pblock->ByteSizeLong();
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
     if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
         _recvr->_max_wait_worker_time->set(wait_for_worker);
@@ -196,7 +185,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
     }
 
-    _block_queue.emplace_back(std::move(block), block_byte_size);
+    _block_queue.emplace_back(std::move(pblock), block_byte_size);
     COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
     _record_debug_info();
     try_set_dep_ready_without_lock();
@@ -370,7 +359,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, pipeline::Exchang
     _first_batch_wait_total_timer = ADD_TIMER(_profile, 
"FirstBatchArrivalWaitTime");
     _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
     _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
-    _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", 
TUnit::UNIT);
     _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", 
TUnit::UNIT);
     _max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", 
TUnit::UNIT);
     _max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", 
TUnit::UNIT);
@@ -401,13 +389,13 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
     return Status::OK();
 }
 
-Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
+Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int 
sender_id, int be_number,
                                    int64_t packet_seq, 
::google::protobuf::Closure** done,
                                    const int64_t wait_for_worker,
                                    const uint64_t time_to_find_recvr) {
     SCOPED_ATTACH_TASK(_query_thread_context);
     int use_sender_id = _is_merging ? sender_id : 0;
-    return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done,
+    return _sender_queues[use_sender_id]->add_block(std::move(pblock), 
be_number, packet_seq, done,
                                                     wait_for_worker, 
time_to_find_recvr);
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 08fb004f3b1..1639366c8b8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/data.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
 
@@ -84,9 +85,9 @@ public:
 
     std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }
 
-    Status add_block(const PBlock& pblock, int sender_id, int be_number, 
int64_t packet_seq,
-                     ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
-                     const uint64_t time_to_find_recvr);
+    Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int 
be_number,
+                     int64_t packet_seq, ::google::protobuf::Closure** done,
+                     const int64_t wait_for_worker, const uint64_t 
time_to_find_recvr);
 
     void add_block(Block* block, int sender_id, bool use_move);
 
@@ -157,8 +158,6 @@ private:
     RuntimeProfile::Counter* _decompress_timer = nullptr;
     RuntimeProfile::Counter* _decompress_bytes = nullptr;
 
-    // Number of rows received
-    RuntimeProfile::Counter* _rows_produced_counter = nullptr;
     // Number of blocks received
     RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
     RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
@@ -181,7 +180,7 @@ public:
 
     Status get_batch(Block* next_block, bool* eos);
 
-    Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
+    Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t 
packet_seq,
                      ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
                      const uint64_t time_to_find_recvr);
 
@@ -205,8 +204,6 @@ public:
 
 protected:
     friend class pipeline::ExchangeLocalState;
-    Status _inner_get_batch_without_lock(Block* block, bool* eos);
-
     void try_set_dep_ready_without_lock();
 
     // To record information about several variables in the event of a DCHECK 
failure.
@@ -260,7 +257,41 @@ protected:
     Status _cancel_status;
     int _num_remaining_senders;
     std::unique_ptr<MemTracker> _queue_mem_tracker;
-    std::list<std::pair<BlockUPtr, size_t>> _block_queue;
+
+    // `BlockItem` is used in `_block_queue` to handle both local and remote 
exchange blocks.
+    // For local exchange blocks, `BlockUPtr` is used directly without any 
modification.
+    // For remote exchange blocks, the `pblock` is stored in `BlockItem`.
+    // When `getBlock` is called, the `pblock` is deserialized into a usable 
block.
+    struct BlockItem {
+        Status get_block(BlockUPtr& block) {
+            if (!_block) {
+                DCHECK(_pblock);
+                SCOPED_RAW_TIMER(&_deserialize_time);
+                _block = Block::create_unique();
+                
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock));
+            }
+            block.swap(_block);
+            _block.reset();
+            return Status::OK();
+        }
+
+        size_t block_byte_size() const { return _block_byte_size; }
+        int64_t deserialize_time() const { return _deserialize_time; }
+        BlockItem() = default;
+        BlockItem(BlockUPtr&& block, size_t block_byte_size)
+                : _block(std::move(block)), _block_byte_size(block_byte_size) 
{}
+
+        BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size)
+                : _block(nullptr), _pblock(std::move(pblock)), 
_block_byte_size(block_byte_size) {}
+
+    private:
+        BlockUPtr _block;
+        std::unique_ptr<PBlock> _pblock;
+        size_t _block_byte_size = 0;
+        int64_t _deserialize_time = 0;
+    };
+
+    std::list<BlockItem> _block_queue;
 
     // sender_id
     std::unordered_set<int> _sender_eos_set;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to