Gabriel39 commented on code in PR #44850:
URL: https://github.com/apache/doris/pull/44850#discussion_r1868661486


##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -724,4 +744,33 @@ DataDistribution 
ExchangeSinkOperatorX::required_data_distribution() const {
     return 
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
 }
 
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() {
+    PUniqueId id;
+    id.set_hi(_state->query_id().hi);
+    id.set_lo(_state->query_id().lo);
+    auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
state());
+    for (const auto& _dest : _dests) {
+        const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
+        // There is no need to check for duplicate dest_fragment_instance_id 
here.
+        // The construct_request function already handles this check 
internally.
+        sink_buffer->construct_request(dest_fragment_instance_id);
+    }
+    return sink_buffer;
+}
+
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() {
+    if (_child) {

Review Comment:
   Is `_child` always true?



##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -724,4 +744,33 @@ DataDistribution 
ExchangeSinkOperatorX::required_data_distribution() const {
     return 
DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution();
 }
 
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() {
+    PUniqueId id;
+    id.set_hi(_state->query_id().hi);
+    id.set_lo(_state->query_id().lo);
+    auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, 
state());
+    for (const auto& _dest : _dests) {
+        const auto& dest_fragment_instance_id = _dest.fragment_instance_id;
+        // There is no need to check for duplicate dest_fragment_instance_id 
here.
+        // The construct_request function already handles this check 
internally.
+        sink_buffer->construct_request(dest_fragment_instance_id);
+    }
+    return sink_buffer;
+}
+
+std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() {
+    if (_child) {
+        if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child)) {
+            return _create_buffer();
+        }
+        if (std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {

Review Comment:
   Make abstraction



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1133,6 +1135,9 @@ public enum IgnoreSplitType {
     @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
     private boolean enableLocalMergeSort = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_SHARED_EXCHANGE_SINK_BUFFER)
+    private boolean enableSharedExchangeSinkBuffer = true;

Review Comment:
   Add it to fuzzy variables.



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -166,10 +166,9 @@ class Channel {
         return Status::OK();
     }
 
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
-        _buffer = buffer;
-        _buffer->register_sink(_fragment_instance_id);
-    }
+    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { 
_buffer = buffer; }
+
+    InstanceLoId ins_id() const { return _fragment_instance_id.lo; }

Review Comment:
   ```suggestion
       InstanceLoId ins_lo_id() const { return _fragment_instance_id.lo; }
   ```



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -158,12 +156,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         if (request.block) {
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
-            COUNTER_UPDATE(_parent->memory_used_counter(), 
request.block->ByteSizeLong());
+            auto* parent = request.channel->_parent;
+            COUNTER_UPDATE(parent->memory_used_counter(), 
request.block->ByteSizeLong());

Review Comment:
   ```suggestion
               COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), 
request.block->ByteSizeLong());
   ```



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -209,23 +207,27 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
 Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
 
-    DCHECK(_rpc_channel_is_idle[id] == false);
-
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
 
-    if (_is_finishing) {
+    if (_is_failed) {
         _turn_off_channel(id, lock);
         return Status::OK();
     }
+    if (_instance_to_receiver_eof[id]) {
+        DCHECK(_rpc_channel_is_turn_off[id]);
+        return Status::OK();
+    }
 
     if (!q.empty()) {
         // If we have data to shuffle which is not broadcasted
         auto& request = q.front();
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        brpc_request->set_sender_id(request.channel->_parent->sender_id());
+        brpc_request->set_be_number(request.channel->_parent->be_number());

Review Comment:
   Add some comments to explain what is `be_number` is used for



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -411,48 +413,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
         __builtin_unreachable();
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        _turn_off_channel(id, lock);
+        _running_sink_count[id]--;
+        if (_running_sink_count[id] == 0) {
+            _turn_off_channel(id, lock);
+        }
     }
 }
 
 void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
-    _is_finishing = true;
+    _is_failed = true;
     _context->cancel(Status::Cancelled(err));
 }
 
 void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
+    // When the receiving side reaches eof, it means the receiver has finished 
early.
+    // The remaining data in the current rpc_channel does not need to be sent,
+    // and the rpc_channel should be turned off immediately.
     _turn_off_channel(id, lock);
-    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =

Review Comment:
   Keep this code block



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -166,10 +166,9 @@ class Channel {
         return Status::OK();
     }
 
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
-        _buffer = buffer;
-        _buffer->register_sink(_fragment_instance_id);
-    }
+    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { 
_buffer = buffer; }

Review Comment:
   change `register_exchange_buffer` to `set_exchange_buffer`



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId 
fragment_instance_id) {
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
     _rpc_channel_is_idle[low_id] = true;
+    _rpc_channel_is_turn_off[low_id] = false;
     _instance_to_receiver_eof[low_id] = false;
     
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
     _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
-    _construct_request(low_id, finst_id);
+    _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
+    _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
+    _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);
+
+    _instance_to_request[low_id]->set_node_id(_dest_node_id);
 }
 
 Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
-    if (_is_finishing) {
+    if (_is_failed) {
         return Status::OK();
     }
-    auto ins_id = request.channel->_fragment_instance_id.lo;
+    auto ins_id = request.channel->ins_id();
     if (!_instance_to_package_queue_mutex.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
                                      
print_id(request.channel->_fragment_instance_id));
     }
-    if (_is_receiver_eof(ins_id)) {

Review Comment:
   Should not delete this judgement.



##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -184,13 +223,10 @@ class ExchangeSinkBuffer final : public 
HasTaskExecutionCtx {
     void update_profile(RuntimeProfile* profile);
 
     void set_dependency(std::shared_ptr<Dependency> queue_dependency,
-                        std::shared_ptr<Dependency> finish_dependency) {
-        _queue_dependency = queue_dependency;
-        _finish_dependency = finish_dependency;
-    }
-
-    void set_broadcast_dependency(std::shared_ptr<Dependency> 
broadcast_dependency) {
-        _broadcast_dependency = broadcast_dependency;
+                        ExchangeSinkLocalState* local_state) {
+        std::lock_guard lc(_init_lock);

Review Comment:
   Better to reduce locking scope



##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -169,13 +169,52 @@ class ExchangeSendCallback : public 
::doris::DummyBrpcCallback<Response> {
     bool _eos;
 };
 
-// Each ExchangeSinkOperator have one ExchangeSinkBuffer
+// ExchangeSinkBuffer can either be shared among multiple 
ExchangeSinkLocalState instances
+// or be individually owned by each ExchangeSinkLocalState.
+// The following describes the scenario where ExchangeSinkBuffer is shared 
among multiple ExchangeSinkLocalState instances.
+// Of course, individual ownership can be seen as a special case where only 
one ExchangeSinkLocalState shares the buffer.
+
+// A sink buffer contains multiple rpc_channels.
+// Each rpc_channel corresponds to a target instance on the receiving side.
+// Data is sent using a ping-pong mode within each rpc_channel,
+// meaning that at most one RPC can exist in a single rpc_channel at a time.
+// The next RPC can only be sent after the previous one has completed.
+//
+// Each exchange sink sends data to all target instances on the receiving side.
+// If the concurrency is 3, a single rpc_channel will be used simultaneously 
by three exchange sinks.
+
+/*                                                                             
                                                                                
                                                                                
                                                                             
+                          +-----------+          +-----------+        
+-----------+      
+                          |dest ins id|          |dest ins id|        |dest 
ins id|      
+                          |           |          |           |        |        
   |      
+                          +----+------+          +-----+-----+        
+------+----+      
+                               |                       |                     | 
          
+                               |                       |                     | 
          
+                      +----------------+      +----------------+     
+----------------+  
+                      |                |      |                |     |         
       |  
+ sink buffer -------- |   rpc_channel  |      |  rpc_channel   |     |  
rpc_channel   |  
+                      |                |      |                |     |         
       |  
+                      +-------+--------+      +----------------+     
+----------------+  
+                              |                        |                      
|          
+                              
|------------------------+----------------------+          
+                              |                        |                      
|          
+                              |                        |                      
|          
+                     +-----------------+       +-------+---------+    
+-------+---------+
+                     |                 |       |                 |    |        
         |
+                     |  exchange sink  |       |  exchange sink  |    |  
exchange sink  |
+                     |                 |       |                 |    |        
         |
+                     +-----------------+       +-----------------+    
+-----------------+
+*/
+
 class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
 public:
-    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
-                       RuntimeState* state, ExchangeSinkLocalState* parent);
+    ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, 
RuntimeState* state);
     ~ExchangeSinkBuffer() override = default;
-    void register_sink(TUniqueId);
+    void register_sink(InstanceLoId id) {
+        std::lock_guard lc(_init_lock);

Review Comment:
   Delete this lock.



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -174,17 +175,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
 }
 
 Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
-    if (_is_finishing) {
+    if (_is_failed) {
         return Status::OK();
     }
-    auto ins_id = request.channel->_fragment_instance_id.lo;
+    auto ins_id = request.channel->ins_id();
     if (!_instance_to_package_queue_mutex.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
                                      
print_id(request.channel->_fragment_instance_id));
     }
-    if (_is_receiver_eof(ins_id)) {

Review Comment:
   Should not delete this judgement.



##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -214,6 +250,10 @@ class ExchangeSinkBuffer final : public 
HasTaskExecutionCtx {
     // One channel is corresponding to a downstream instance.
     phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
 
+    // There could be multiple situations that cause an rpc_channel to be 
turned off,
+    // such as receiving the eof, manual cancellation by the user, or all 
sinks reaching eos.
+    // Therefore, it is necessary to prevent an rpc_channel from being turned 
off multiple times.
+    phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off;

Review Comment:
   Could we unify these 2 maps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to