This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e41ffc7cb51 [Refactor](Exec) Support one rpc send muti blocks (#50113)
e41ffc7cb51 is described below

commit e41ffc7cb51e5336c730e6e9e4d6b9c80c353c5b
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Tue Apr 29 10:35:17 2025 +0800

    [Refactor](Exec) Support one rpc send muti blocks (#50113)
    
    1. Rmove unless channel ptr in trans struct to reduce mem consume
    2. Send multi block one time in one rpc
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 256 +++++++++++++++------
 be/src/pipeline/exec/exchange_sink_buffer.h        |  15 +-
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  12 +
 be/src/vec/sink/vdata_stream_sender.cpp            |   4 +-
 be/test/vec/exec/exchange_sink_test.h              |   5 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 ++
 gensrc/proto/internal_service.proto                |   1 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 8 files changed, 220 insertions(+), 87 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 95a5a00f68a..fc28f8b115c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -100,7 +100,13 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, 
PlanNodeId dest_node_
           _node_id(node_id),
           _state(state),
           _context(state->get_query_ctx()),
-          _exchange_sink_num(sender_ins_ids.size()) {}
+          _exchange_sink_num(sender_ins_ids.size()),
+          
_send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size
 &&
+                             
state->query_options().exchange_multi_blocks_byte_size > 0) {
+    if (_send_multi_blocks) {
+        _send_multi_blocks_byte_size = 
state->query_options().exchange_multi_blocks_byte_size;
+    }
+}
 
 void ExchangeSinkBuffer::close() {
     // Could not clear the queue here, because there maybe a running rpc want 
to
@@ -124,9 +130,12 @@ void ExchangeSinkBuffer::construct_request(TUniqueId 
fragment_instance_id) {
     auto instance_data = std::make_unique<RpcInstance>(low_id);
     instance_data->mutex = std::make_unique<std::mutex>();
     instance_data->seq = 0;
-    instance_data->package_queue = std::queue<TransmitInfo, 
std::list<TransmitInfo>>();
-    instance_data->broadcast_package_queue =
-            std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>();
+    instance_data->package_queue =
+            std::unordered_map<vectorized::Channel*,
+                               std::queue<TransmitInfo, 
std::list<TransmitInfo>>>();
+    instance_data->broadcast_package_queue = std::unordered_map<
+            vectorized::Channel*,
+            std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>>();
     _queue_capacity = config::exchg_buffer_queue_capacity_factor * 
_rpc_instances.size();
 
     PUniqueId finst_id;
@@ -146,14 +155,14 @@ void ExchangeSinkBuffer::construct_request(TUniqueId 
fragment_instance_id) {
     _rpc_instances[low_id] = std::move(instance_data);
 }
 
-Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
+Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel, 
TransmitInfo&& request) {
     if (_is_failed) {
         return Status::OK();
     }
-    auto ins_id = request.channel->dest_ins_id();
+    auto ins_id = channel->dest_ins_id();
     if (!_rpc_instances.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
-                                     
print_id(request.channel->_fragment_instance_id));
+                                     print_id(channel->_fragment_instance_id));
     }
     auto& instance_data = *_rpc_instances[ins_id];
     if (instance_data.rpc_channel_is_turn_off) {
@@ -170,10 +179,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         if (request.block) {
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
-            COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-                           request.block->ByteSizeLong());
+            COUNTER_UPDATE(channel->_parent->memory_used_counter(), 
request.block->ByteSizeLong());
         }
-        instance_data.package_queue.emplace(std::move(request));
+        instance_data.package_queue[channel].emplace(std::move(request));
         _total_queue_size++;
         if (_total_queue_size > _queue_capacity) {
             for (auto& dep : _queue_deps) {
@@ -188,14 +196,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
     return Status::OK();
 }
 
-Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
+Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel,
+                                     BroadcastTransmitInfo&& request) {
     if (_is_failed) {
         return Status::OK();
     }
-    auto ins_id = request.channel->dest_ins_id();
+    auto ins_id = channel->dest_ins_id();
     if (!_rpc_instances.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
-                                     
print_id(request.channel->_fragment_instance_id));
+                                     print_id(channel->_fragment_instance_id));
     }
     auto& instance_data = *_rpc_instances[ins_id];
     if (instance_data.rpc_channel_is_turn_off) {
@@ -213,7 +222,7 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
                     request.block_holder->get_block()->be_exec_version()));
         }
-        instance_data.broadcast_package_queue.emplace(request);
+        instance_data.broadcast_package_queue[channel].emplace(request);
     }
     if (send_now) {
         RETURN_IF_ERROR(_send_rpc(instance_data));
@@ -225,9 +234,31 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
 Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
     std::unique_lock<std::mutex> lock(*(instance_data.mutex));
 
-    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
instance_data.package_queue;
-    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
-            instance_data.broadcast_package_queue;
+    auto& q_map = instance_data.package_queue;
+    auto& broadcast_q_map = instance_data.broadcast_package_queue;
+
+    auto find_max_size_queue = [](vectorized::Channel*& channel, auto& ptr, 
auto& map) {
+        for (auto& [chan, lists] : map) {
+            if (!ptr) {
+                if (!lists.empty()) {
+                    channel = chan;
+                    ptr = &lists;
+                }
+            } else {
+                if (ptr->size() < lists.size()) {
+                    channel = chan;
+                    ptr = &lists;
+                }
+            }
+        }
+    };
+
+    vectorized::Channel* channel = nullptr;
+
+    std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr;
+    find_max_size_queue(channel, q_ptr, q_map);
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* 
broadcast_q_ptr = nullptr;
+    find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map);
 
     if (_is_failed) {
         _turn_off_channel(instance_data, lock);
@@ -237,20 +268,49 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& 
instance_data) {
         return Status::OK();
     }
 
-    if (!q.empty()) {
+    auto mem_byte = 0;
+    if (q_ptr && !q_ptr->empty()) {
+        auto& q = *q_ptr;
+
+        std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
+        for (int i = 0; i < requests.size(); i++) {
+            requests[i] = std::move(q.front());
+            q.pop();
+
+            if (requests[i].block) {
+                // make sure rpc byte size under the 
_send_multi_blocks_bytes_size
+                mem_byte += requests[i].block->ByteSizeLong();
+                if (_send_multi_blocks && mem_byte > 
_send_multi_blocks_byte_size) {
+                    requests.resize(i + 1);
+                    break;
+                }
+            }
+        }
+
         // If we have data to shuffle which is not broadcasted
-        auto& request = q.front();
+        auto& request = requests[0];
         auto& brpc_request = instance_data.request;
-        brpc_request->set_eos(request.eos);
-        brpc_request->set_packet_seq(instance_data.seq++);
-        brpc_request->set_sender_id(request.channel->_parent->sender_id());
-        brpc_request->set_be_number(request.channel->_parent->be_number());
-        if (request.block && !request.block->column_metas().empty()) {
-            brpc_request->set_allocated_block(request.block.get());
+        brpc_request->set_sender_id(channel->_parent->sender_id());
+        brpc_request->set_be_number(channel->_parent->be_number());
+
+        if (_send_multi_blocks) {
+            for (auto& req : requests) {
+                if (req.block && !req.block->column_metas().empty()) {
+                    auto add_block = brpc_request->add_blocks();
+                    add_block->Swap(req.block.get());
+                }
+            }
+        } else {
+            if (request.block && !request.block->column_metas().empty()) {
+                brpc_request->set_allocated_block(request.block.get());
+            }
         }
-        auto send_callback = 
request.channel->get_send_callback(&instance_data, request.eos);
 
-        
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
+        instance_data.seq += requests.size();
+        brpc_request->set_packet_seq(instance_data.seq);
+        brpc_request->set_eos(requests.back().eos);
+        auto send_callback = channel->get_send_callback(&instance_data, 
requests.back().eos);
+        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
         if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
@@ -308,38 +368,79 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& 
instance_data) {
             if (enable_http_send_block(*brpc_request)) {
                 RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                                                       
std::move(send_remote_block_closure),
-                                                      
request.channel->_brpc_dest_addr));
+                                                      
channel->_brpc_dest_addr));
             } else {
-                transmit_blockv2(*request.channel->_brpc_stub,
-                                 std::move(send_remote_block_closure));
+                transmit_blockv2(*channel->_brpc_stub, 
std::move(send_remote_block_closure));
             }
         }
-        if (request.block) {
-            COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
-                           -request.block->ByteSizeLong());
+
+        if (!_send_multi_blocks && request.block) {
             static_cast<void>(brpc_request->release_block());
+        } else {
+            brpc_request->clear_blocks();
         }
-        q.pop();
-        _total_queue_size--;
+        if (mem_byte) {
+            COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte);
+        }
+        DCHECK_GE(_total_queue_size, requests.size());
+        _total_queue_size -= (int)requests.size();
         if (_total_queue_size <= _queue_capacity) {
             for (auto& dep : _queue_deps) {
                 dep->set_ready();
             }
         }
-    } else if (!broadcast_q.empty()) {
+    } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
+        auto& broadcast_q = *broadcast_q_ptr;
         // If we have data to shuffle which is broadcasted
-        auto& request = broadcast_q.front();
+        std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? 
broadcast_q.size() : 1);
+        for (int i = 0; i < requests.size(); i++) {
+            requests[i] = broadcast_q.front();
+            broadcast_q.pop();
+
+            if (requests[i].block_holder->get_block()) {
+                // make sure rpc byte size under the 
_send_multi_blocks_bytes_size
+                mem_byte += 
requests[i].block_holder->get_block()->ByteSizeLong();
+                if (_send_multi_blocks && mem_byte > 
_send_multi_blocks_byte_size) {
+                    requests.resize(i + 1);
+                    break;
+                }
+            }
+        }
+
+        auto& request = requests[0];
         auto& brpc_request = instance_data.request;
-        brpc_request->set_eos(request.eos);
-        brpc_request->set_packet_seq(instance_data.seq++);
-        brpc_request->set_sender_id(request.channel->_parent->sender_id());
-        brpc_request->set_be_number(request.channel->_parent->be_number());
-        if (request.block_holder->get_block() &&
-            !request.block_holder->get_block()->column_metas().empty()) {
-            
brpc_request->set_allocated_block(request.block_holder->get_block());
+        brpc_request->set_sender_id(channel->_parent->sender_id());
+        brpc_request->set_be_number(channel->_parent->be_number());
+
+        if (_send_multi_blocks) {
+            for (int i = 0; i < requests.size(); i++) {
+                auto& req = requests[i];
+                if (auto block = req.block_holder->get_block();
+                    block && !block->column_metas().empty()) {
+                    auto add_block = brpc_request->add_blocks();
+                    for (int j = 0; j < block->column_metas_size(); ++j) {
+                        
add_block->add_column_metas()->CopyFrom(block->column_metas(j));
+                    }
+                    add_block->set_be_exec_version(block->be_exec_version());
+                    add_block->set_compressed(block->compressed());
+                    add_block->set_compression_type(block->compression_type());
+                    
add_block->set_uncompressed_size(block->uncompressed_size());
+                    add_block->set_allocated_column_values(
+                            const_cast<std::string*>(&block->column_values()));
+                }
+            }
+        } else {
+            if (request.block_holder->get_block() &&
+                !request.block_holder->get_block()->column_metas().empty()) {
+                
brpc_request->set_allocated_block(request.block_holder->get_block());
+            }
         }
-        auto send_callback = 
request.channel->get_send_callback(&instance_data, request.eos);
-        
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
+        instance_data.seq += requests.size();
+        brpc_request->set_packet_seq(instance_data.seq);
+        brpc_request->set_eos(requests.back().eos);
+        auto send_callback = channel->get_send_callback(&instance_data, 
requests.back().eos);
+
+        send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms);
         if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
@@ -397,16 +498,19 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& 
instance_data) {
             if (enable_http_send_block(*brpc_request)) {
                 RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                                                       
std::move(send_remote_block_closure),
-                                                      
request.channel->_brpc_dest_addr));
+                                                      
channel->_brpc_dest_addr));
             } else {
-                transmit_blockv2(*request.channel->_brpc_stub,
-                                 std::move(send_remote_block_closure));
+                transmit_blockv2(*channel->_brpc_stub, 
std::move(send_remote_block_closure));
             }
         }
-        if (request.block_holder->get_block()) {
+        if (!_send_multi_blocks && request.block_holder->get_block()) {
             static_cast<void>(brpc_request->release_block());
+        } else {
+            for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
+                
static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
+            }
+            brpc_request->clear_blocks();
         }
-        broadcast_q.pop();
     } else {
         instance_data.rpc_channel_is_idle = true;
     }
@@ -436,27 +540,27 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& 
ins) {
     // and the rpc_channel should be turned off immediately.
     Defer turn_off([&]() { _turn_off_channel(ins, lock); });
 
-    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
-            ins.broadcast_package_queue;
-    for (; !broadcast_q.empty(); broadcast_q.pop()) {
-        if (broadcast_q.front().block_holder->get_block()) {
-            
COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(),
-                           
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+    auto& broadcast_q_map = ins.broadcast_package_queue;
+    for (auto& [channel, broadcast_q] : broadcast_q_map) {
+        for (; !broadcast_q.empty(); broadcast_q.pop()) {
+            if (broadcast_q.front().block_holder->get_block()) {
+                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
+                               
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+            }
         }
     }
-    {
-        std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
empty;
-        swap(empty, broadcast_q);
-    }
-
-    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue;
-    for (; !q.empty(); q.pop()) {
-        // Must update _total_queue_size here, otherwise if _total_queue_size 
> _queue_capacity at EOF,
-        // ExchangeSinkQueueDependency will be blocked and pipeline will be 
deadlocked
-        _total_queue_size--;
-        if (q.front().block) {
-            COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
-                           -q.front().block->ByteSizeLong());
+    broadcast_q_map.clear();
+
+    auto& q_map = ins.package_queue;
+    for (auto& [channel, q] : q_map) {
+        for (; !q.empty(); q.pop()) {
+            // Must update _total_queue_size here, otherwise if 
_total_queue_size > _queue_capacity at EOF,
+            // ExchangeSinkQueueDependency will be blocked and pipeline will 
be deadlocked
+            _total_queue_size--;
+            if (q.front().block) {
+                COUNTER_UPDATE(channel->_parent->memory_used_counter(),
+                               -q.front().block->ByteSizeLong());
+            }
         }
     }
 
@@ -467,10 +571,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& 
ins) {
         }
     }
 
-    {
-        std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
-        swap(empty, q);
-    }
+    q_map.clear();
 }
 
 // The unused parameter `with_lock` is to ensure that the function is called 
when the lock is held.
@@ -582,8 +683,11 @@ std::string 
ExchangeSinkBuffer::debug_each_instance_queue_size() {
     fmt::memory_buffer debug_string_buffer;
     for (auto& [id, instance_data] : _rpc_instances) {
         std::unique_lock<std::mutex> lock(*instance_data->mutex);
-        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", 
id,
-                       instance_data->package_queue.size());
+        auto queue_size = 0;
+        for (auto& [_, list] : instance_data->package_queue) {
+            queue_size += list.size();
+        }
+        fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", 
id, queue_size);
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 9a00ef072d5..44416ef68e1 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -111,13 +111,11 @@ private:
 
 namespace pipeline {
 struct TransmitInfo {
-    vectorized::Channel* channel = nullptr;
     std::unique_ptr<PBlock> block;
     bool eos;
 };
 
 struct BroadcastTransmitInfo {
-    vectorized::Channel* channel = nullptr;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
     bool eos;
 };
@@ -144,10 +142,13 @@ struct RpcInstance {
     int64_t seq = 0;
 
     // Queue for regular data transmission requests
-    std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue;
+    std::unordered_map<vectorized::Channel*, std::queue<TransmitInfo, 
std::list<TransmitInfo>>>
+            package_queue;
 
     // Queue for broadcast data transmission requests
-    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
broadcast_package_queue;
+    std::unordered_map<vectorized::Channel*,
+                       std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>>
+            broadcast_package_queue;
 
     // RPC request parameters for data transmission
     std::shared_ptr<PTransmitDataParams> request;
@@ -275,8 +276,8 @@ public:
 
     void construct_request(TUniqueId);
 
-    Status add_block(TransmitInfo&& request);
-    Status add_block(BroadcastTransmitInfo&& request);
+    Status add_block(vectorized::Channel* channel, TransmitInfo&& request);
+    Status add_block(vectorized::Channel* channel, BroadcastTransmitInfo&& 
request);
     void close();
     void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
@@ -341,6 +342,8 @@ private:
     // The ExchangeSinkLocalState in _parents is only used in 
_turn_off_channel.
     std::vector<ExchangeSinkLocalState*> _parents;
     const int64_t _exchange_sink_num;
+    bool _send_multi_blocks = false;
+    int _send_multi_blocks_byte_size = 256 * 1024;
 };
 
 } // namespace pipeline
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 535f59c49e8..2a4f4e22861 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -146,6 +146,18 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
     }
 
     bool eos = request->eos();
+    if (!request->blocks().empty()) {
+        for (int i = 0; i < request->blocks_size(); i++) {
+            std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
+            pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i)));
+            RETURN_IF_ERROR(recvr->add_block(
+                    std::move(pblock_ptr), request->sender_id(), 
request->be_number(),
+                    request->packet_seq() - request->blocks_size() + i, eos ? 
nullptr : done,
+                    wait_for_worker, cpu_time_stop_watch.elapsed_time()));
+        }
+    }
+
+    // old logic, for compatibility
     if (request->has_block()) {
         std::unique_ptr<PBlock> pblock_ptr {
                 const_cast<PTransmitDataParams*>(request)->release_block()};
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 3f09d02f20f..0e9c5371a2b 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&& 
block, bool eos) {
         }
     }
     if (eos || block->column_metas_size()) {
-        RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos}));
+        RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos}));
     }
     return Status::OK();
 }
@@ -188,7 +188,7 @@ Status 
Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& blo
         _eos_send = true;
     }
     if (eos || block->get_block()->column_metas_size()) {
-        RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
+        RETURN_IF_ERROR(_buffer->add_block(this, {block, eos}));
     }
     return Status::OK();
 }
diff --git a/be/test/vec/exec/exchange_sink_test.h 
b/be/test/vec/exec/exchange_sink_test.h
index 59004a53a9e..5cf0fdbed6a 100644
--- a/be/test/vec/exec/exchange_sink_test.h
+++ b/be/test/vec/exec/exchange_sink_test.h
@@ -138,9 +138,8 @@ struct SinkWithChannel {
     std::map<int64_t, std::shared_ptr<Channel>> channels;
     Status add_block(int64_t id, bool eos) {
         auto channel = channels[id];
-        TransmitInfo transmitInfo {
-                .channel = channel.get(), .block = std::make_unique<PBlock>(), 
.eos = eos};
-        return buffer->add_block(std::move(transmitInfo));
+        TransmitInfo transmitInfo {.block = std::make_unique<PBlock>(), .eos = 
eos};
+        return buffer->add_block(channel.get(), std::move(transmitInfo));
     }
 };
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c7d26af924f..67e52b66133 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -674,6 +674,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_ES_PARALLEL_SCROLL = 
"enable_es_parallel_scroll";
 
+    public static final String EXCHANGE_MULTI_BLOCKS_BYTE_SIZE = 
"exchange_multi_blocks_byte_size";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -2243,6 +2245,10 @@ public class SessionVariable implements Serializable, 
Writable {
                     "When processing both \\n and \\r\\n as CSV line 
separators, should \\r be retained?"})
     public boolean keepCarriageReturn = false;
 
+    @VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE,
+            description = {"Enable exchange to send multiple blocks in one 
RPC. Default is 256KB. A negative"
+                    + " value disables multi-block exchange."})
+    public int exchangeMultiBlocksByteSize = 256 * 1024;
 
     @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
             description = {"强制使用jni方式读取外表", "Force the use of jni mode to read 
external table"})
@@ -2591,6 +2597,12 @@ public class SessionVariable implements Serializable, 
Writable {
         this.disableStreamPreaggregations = random.nextBoolean();
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         this.enableParallelResultSink = random.nextBoolean();
+
+        // 4KB = 4 * 1024 bytes
+        int minBytes = 4 * 1024;
+        // 10MB = 10 * 1024 * 1024 bytes
+        int maxBytes = 10 * 1024 * 1024;
+        this.exchangeMultiBlocksByteSize = minBytes + (int) 
(random.nextDouble() * (maxBytes - minBytes));
         int randomInt = random.nextInt(4);
         if (randomInt % 2 == 0) {
             this.rewriteOrToInPredicateThreshold = 100000;
@@ -4165,6 +4177,7 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);
 
         
tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB);
+        tResult.setExchangeMultiBlocksByteSize(exchangeMultiBlocksByteSize);
         return tResult;
     }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 402f1044568..e36907e9ea1 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -51,6 +51,7 @@ message PTransmitDataParams {
     optional bool transfer_by_attachment = 10 [default = false];
     optional PUniqueId query_id = 11;
     optional PStatus exec_status = 12;
+    repeated PBlock blocks = 13;
 };
 
 message PTransmitDataResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 60ca569949f..81e4d1f877c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -393,6 +393,7 @@ struct TQueryOptions {
   162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
   163: optional bool inverted_index_compatible_read = false
   164: optional bool check_orc_init_sargs_success = false
+  165: optional i32 exchange_multi_blocks_byte_size = 262144 
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to