On Wed, Feb 14, 2018 at 5:28 AM, Sowmini Varadhan <sowmini.varad...@oracle.com> wrote: > If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and, > if the SO_ZEROCOPY socket option has been set on the PF_RDS socket, > application pages sent down with rds_sendmsg() are pinned. > > The pinning uses the accounting infrastructure added by > Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages") > > The payload bytes in the message may not be modified for the > duration that the message has been pinned. A multi-threaded > application using this infrastructure may thus need to be notified > about send-completion so that it can free/reuse the buffers > passed to rds_sendmsg(). Notification of send-completion will > identify each message-buffer by a cookie that the application > must specify as ancillary data to rds_sendmsg(). > The ancillary data in this case has cmsg_level == SOL_RDS > and cmsg_type == RDS_CMSG_ZCOPY_COOKIE. > > Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com> > ---
If the missing break is intentional, no need to respin just for the other minor comments. > @@ -341,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long > *page_addrs, unsigned in > return rm; > } > > -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) > +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, > + bool zcopy) > { > unsigned long to_copy, nbytes; > unsigned long sg_off; > struct scatterlist *sg; > int ret = 0; > + int length = iov_iter_count(from); > > rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); > > @@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, > struct iov_iter *from) > sg = rm->data.op_sg; > sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ > > + if (zcopy) { > + int total_copied = 0; > + struct sk_buff *skb; > + > + skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32), > + GFP_KERNEL); > + if (!skb) > + return -ENOMEM; > + rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); > + memset(rm->data.op_mmp_znotifier, 0, > + sizeof(*rm->data.op_mmp_znotifier)); Not strictly needed, as alloc_skb clears skb->cb[] > + if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, > + length)) { > + consume_skb(skb); > + rm->data.op_mmp_znotifier = NULL; > + return -ENOMEM; > + } One less action to revert if moving the mm_account_pinned_pages check before assigning op_mmp_znotifier. Conversely, move to an err: label at the end to be able to deduplicate with the error branch introduced below. > @@ -875,12 +875,13 @@ static int rds_send_queue_rm(struct rds_sock *rs, > struct rds_connection *conn, > * rds_message is getting to be quite complicated, and we'd like to allocate > * it all in one go. This figures out how big it needs to be up front. > */ > -static int rds_rm_size(struct msghdr *msg, int data_len) > +static int rds_rm_size(struct msghdr *msg, int num_sgs) > { > struct cmsghdr *cmsg; > int size = 0; > int cmsg_groups = 0; > int retval; > + bool zcopy_cookie = false; > > for_each_cmsghdr(cmsg, msg) { > if (!CMSG_OK(msg, cmsg)) > @@ -899,6 +900,8 @@ static int rds_rm_size(struct msghdr *msg, int data_len) > > break; > > + case RDS_CMSG_ZCOPY_COOKIE: > + zcopy_cookie = true; break, or if intended to fall through, please label as such. > case RDS_CMSG_RDMA_DEST: > case RDS_CMSG_RDMA_MAP: > cmsg_groups |= 2;