On Wed, Jan 17, 2018 at 7:20 AM, Sowmini Varadhan
<sowmini.varad...@oracle.com> wrote:
> RDS removes a datagram from the retransmit queue when an ACK is
> received. The ACK indicates that the receiver has queued the
> RDS datagram, so that the sender can safely forget the datagram.
>
> If the datagram to be removed had pinned pages set up, add
> an entry to the rs->rs_znotify_queue so that the notifcation
> will be sent up via rds_rm_zerocopy_callback() when the
> rds_message is eventually freed by rds_message_purge.
>
> Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com>
> ---

> +static void rds_rm_zerocopy_callback(struct rds_sock *rs)
> +{
> +       struct sock *sk = rds_rs_to_sk(rs);
> +       struct sk_buff *skb;
> +       struct sock_exterr_skb *serr;
> +       struct sk_buff_head *q;
> +       unsigned long flags;
> +       struct sk_buff *tail;
> +       u32 *ptr;
> +       int ncookies = 0, i;
> +       struct rds_znotifier *znotif, *ztmp;
> +       LIST_HEAD(tmp_list);
> +
> +       spin_lock_irqsave(&rs->rs_lock, flags);
> +       list_splice(&rs->rs_znotify_queue, &tmp_list);
> +       INIT_LIST_HEAD(&rs->rs_znotify_queue);
> +       spin_unlock_irqrestore(&rs->rs_lock, flags);
> +
> +       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list)
> +               ncookies++;
> +       if (ncookies == 0)
> +               return;
> +       skb = alloc_skb(ncookies * sizeof(u32), GFP_ATOMIC);
> +       if (!skb) {
> +               spin_lock_irqsave(&rs->rs_lock, flags);
> +               list_splice(&tmp_list, &rs->rs_znotify_queue);
> +               spin_unlock_irqrestore(&rs->rs_lock, flags);
> +               return;
> +       }

TCP zerocopy avoids this issue by allocating the notification skb when the
zerocopy packet is created, which would be rds_message_copy_from_user.

This does not add an allocation, if using the same trick of stashing
the intermediate notification object (op_mmp_znotifier) in skb->cb. Though,
alloc_skb is probably more expensive than that kzalloc. If nothing else,
because of more zeroing.

> +       serr = SKB_EXT_ERR(skb);
> +       serr->ee.ee_errno = 0;
> +       serr->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
> +       serr->ee.ee_data = ncookies;

This changes the semantics of these fields. Please add a new SO_EE_CODE flag,
even if the semantics can be derived from the packet family (for now).

Even better would be if we can avoid the cookies completely. I understand
the issue with concurrent send threads racing on obtaining a zckey value. If
the sender could learn which zckey was chosen for a call, would that suffice?

I suspect that in even with concurrent senders, notifications arrive largely in
order, in which case we could just maintain the existing semantics and even
reuse that implementation.

> +       serr->ee.ee_info = 0;
> +       serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;

Why set the copied bit here?

> +       ptr = skb_put(skb, ncookies * sizeof(u32));
> +
> +       i = 0;
> +       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
> +               list_del(&znotif->z_list);
> +               ptr[i++] = znotif->z_cookie;
> +               mm_unaccount_pinned_pages(&znotif->z_mmp);
> +               kfree(znotif);
> +       }
> +       WARN_ON(!list_empty(&tmp_list));
> +       q = &sk->sk_error_queue;
> +       spin_lock_irqsave(&q->lock, flags);
> +       tail = skb_peek_tail(q);
> +       if (!tail ||
> +           SKB_EXT_ERR(tail)->ee.ee_origin != SO_EE_ORIGIN_ZEROCOPY)  {
> +               __skb_queue_tail(q, skb);
> +               skb = NULL;
> +       }

This drops the packet if the branch is not taken. In the TCP case this condition
means that we can try to coalesce packets, but that is not the case here.

> +       spin_unlock_irqrestore(&q->lock, flags);
> +       sk->sk_error_report(sk);
> +       consume_skb(skb);
> +}
> +
>  /*
>   * This relies on dma_map_sg() not touching sg[].page during merging.
>   */
> @@ -66,11 +127,15 @@ static void rds_message_purge(struct rds_message *rm)
>         for (i = 0; i < rm->data.op_nents; i++) {
>                 rdsdebug("putting data page %p\n", (void 
> *)sg_page(&rm->data.op_sg[i]));
>                 /* XXX will have to put_page for page refs */
> -               __free_page(sg_page(&rm->data.op_sg[i]));
> +               if (!rm->data.op_zcopy)
> +                       __free_page(sg_page(&rm->data.op_sg[i]));
> +               else
> +                       put_page(sg_page(&rm->data.op_sg[i]));
>         }
>         rm->data.op_nents = 0;
>         spin_lock_irqsave(&rm->m_rs_lock, flags);
>         if (rm->m_rs) {
> +               rds_rm_zerocopy_callback(rm->m_rs);
>                 sock_put(rds_rs_to_sk(rm->m_rs));
>                 rm->m_rs = NULL;
>         }
> diff --git a/net/rds/rds.h b/net/rds/rds.h
> index 374ae83..de5015a 100644
> --- a/net/rds/rds.h
> +++ b/net/rds/rds.h
> @@ -356,6 +356,12 @@ static inline u32 
> rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
>  #define RDS_MSG_PAGEVEC                7
>  #define RDS_MSG_FLUSH          8
>
> +struct rds_znotifier {
> +       struct list_head        z_list;
> +       u32                     z_cookie;
> +       struct mmpin            z_mmp;
> +};
> +
>  struct rds_message {
>         refcount_t              m_refcount;
>         struct list_head        m_sock_item;
> @@ -431,11 +437,14 @@ struct rds_message {
>                 } rdma;
>                 struct rm_data_op {
>                         unsigned int            op_active:1;
> -                       unsigned int            op_notify:1;
> +                       unsigned int            op_notify:1,
> +                                               op_zcopy:1,

not necessary if op_mmp_znotifier is NULL unless set in
rds_message_copy_from_user

> +                                               op_pad_to_32:30;
>                         unsigned int            op_nents;
>                         unsigned int            op_count;
>                         unsigned int            op_dmasg;
>                         unsigned int            op_dmaoff;
> +                       struct rds_znotifier    *op_mmp_znotifier;
>                         struct scatterlist      *op_sg;
>                 } data;

Reply via email to