yiguolei commented on code in PR #41968:
URL: https://github.com/apache/doris/pull/41968#discussion_r1804072426


##########
be/src/vec/sink/vdata_stream_sender.cpp:
##########
@@ -309,67 +215,52 @@ Status Channel<Parent>::close_wait(RuntimeState* state) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
-    if (!_need_close) {
+Status Channel::close(RuntimeState* state) {
+    if (_closed) {
         return Status::OK();
     }
-    VLOG_RPC << "Channel::close_internal() instance_id=" << 
print_id(_fragment_instance_id)
-             << " dest_node=" << _dest_node_id << " #rows= "
-             << ((_serializer.get_block() == nullptr) ? 0 : 
_serializer.get_block()->rows())
-             << " receiver status: " << _receiver_status << ", exec_status: " 
<< exec_status;
+    _closed = true;
+
     if (is_receiver_eof()) {
         _serializer.reset_block();
-        return Status::OK();
     }
-    Status status;
-    if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() 
> 0) {
-        status = send_current_block(true, exec_status);
-    } else {
-        if (is_local()) {
-            if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
-            }
-        } else {
-            // Non pipeline engine will send an empty eos block
-            status = send_remote_block((PBlock*)nullptr, true, exec_status);
+
+    if (is_local()) {
+        if (_recvr_is_valid()) {
+            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
Status::OK());
         }
-    }
-    // Don't wait for the last packet to finish, left it to close_wait.
-    if (status.is<ErrorCode::END_OF_FILE>()) {
-        return Status::OK();
     } else {
-        return status;
+        RETURN_IF_ERROR(send_remote_block(nullptr, true, Status::OK()));
     }
+    return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
+Status Channel::_wait_last_brpc() {

Review Comment:
   我们都走exchange sink buffer了,为啥还需要wait rpc,这个操作应该是在exchange sink buffer 里完成的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to