yiguolei commented on code in PR #41968: URL: https://github.com/apache/doris/pull/41968#discussion_r1804072426
########## be/src/vec/sink/vdata_stream_sender.cpp: ########## @@ -309,67 +215,52 @@ Status Channel<Parent>::close_wait(RuntimeState* state) { return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::close_internal(Status exec_status) { - if (!_need_close) { +Status Channel::close(RuntimeState* state) { + if (_closed) { return Status::OK(); } - VLOG_RPC << "Channel::close_internal() instance_id=" << print_id(_fragment_instance_id) - << " dest_node=" << _dest_node_id << " #rows= " - << ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows()) - << " receiver status: " << _receiver_status << ", exec_status: " << exec_status; + _closed = true; + if (is_receiver_eof()) { _serializer.reset_block(); - return Status::OK(); } - Status status; - if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { - status = send_current_block(true, exec_status); - } else { - if (is_local()) { - if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); - } - } else { - // Non pipeline engine will send an empty eos block - status = send_remote_block((PBlock*)nullptr, true, exec_status); + + if (is_local()) { + if (_recvr_is_valid()) { + _local_recvr->remove_sender(_parent->sender_id(), _be_number, Status::OK()); } - } - // Don't wait for the last packet to finish, left it to close_wait. - if (status.is<ErrorCode::END_OF_FILE>()) { - return Status::OK(); } else { - return status; + RETURN_IF_ERROR(send_remote_block(nullptr, true, Status::OK())); } + return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::close(RuntimeState* state, Status exec_status) { - if (_closed) { +Status Channel::_wait_last_brpc() { Review Comment: 我们都走exchange sink buffer了,为啥还需要wait rpc,这个操作应该是在exchange sink buffer 里完成的 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org