RDS removes a datagram from the retransmit queue when an ACK is
received. The ACK indicates that the receiver has queued the
RDS datagram, so that the sender can safely forget the datagram.

If the datagram to be removed had pinned pages set up, add
an entry to the rs->rs_znotify_queue so that the notifcation
will be sent up via rds_rm_zerocopy_callback() when the
rds_message is eventually freed by rds_message_purge.

Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com>
---
 net/rds/af_rds.c  |    3 ++
 net/rds/message.c |   67 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/rds/rds.h     |   13 +++++++++-
 net/rds/recv.c    |    3 ++
 net/rds/send.c    |    7 +++++
 5 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b405f77..23126db 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -183,6 +183,8 @@ static unsigned int rds_poll(struct file *file, struct 
socket *sock,
                mask |= (POLLIN | POLLRDNORM);
        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
                mask |= (POLLOUT | POLLWRNORM);
+       if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+               mask |= POLLERR;
        read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 
        /* clear state any time we wake a seen-congested socket */
@@ -511,6 +513,7 @@ static int __rds_create(struct socket *sock, struct sock 
*sk, int protocol)
        INIT_LIST_HEAD(&rs->rs_send_queue);
        INIT_LIST_HEAD(&rs->rs_recv_queue);
        INIT_LIST_HEAD(&rs->rs_notify_queue);
+       INIT_LIST_HEAD(&rs->rs_znotify_queue);
        INIT_LIST_HEAD(&rs->rs_cong_list);
        spin_lock_init(&rs->rs_rdma_lock);
        rs->rs_rdma_keys = RB_ROOT;
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daaf..25c74b3 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
 #include <linux/kernel.h>
 #include <linux/slab.h>
 #include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
 
 #include "rds.h"
 
@@ -53,6 +56,64 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
+static void rds_rm_zerocopy_callback(struct rds_sock *rs)
+{
+       struct sock *sk = rds_rs_to_sk(rs);
+       struct sk_buff *skb;
+       struct sock_exterr_skb *serr;
+       struct sk_buff_head *q;
+       unsigned long flags;
+       struct sk_buff *tail;
+       u32 *ptr;
+       int ncookies = 0, i;
+       struct rds_znotifier *znotif, *ztmp;
+       LIST_HEAD(tmp_list);
+
+       spin_lock_irqsave(&rs->rs_lock, flags);
+       list_splice(&rs->rs_znotify_queue, &tmp_list);
+       INIT_LIST_HEAD(&rs->rs_znotify_queue);
+       spin_unlock_irqrestore(&rs->rs_lock, flags);
+
+       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list)
+               ncookies++;
+       if (ncookies == 0)
+               return;
+       skb = alloc_skb(ncookies * sizeof(u32), GFP_ATOMIC);
+       if (!skb) {
+               spin_lock_irqsave(&rs->rs_lock, flags);
+               list_splice(&tmp_list, &rs->rs_znotify_queue);
+               spin_unlock_irqrestore(&rs->rs_lock, flags);
+               return;
+       }
+       serr = SKB_EXT_ERR(skb);
+       serr->ee.ee_errno = 0;
+       serr->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
+       serr->ee.ee_data = ncookies;
+       serr->ee.ee_info = 0;
+       serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+       ptr = skb_put(skb, ncookies * sizeof(u32));
+
+       i = 0;
+       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
+               list_del(&znotif->z_list);
+               ptr[i++] = znotif->z_cookie;
+               mm_unaccount_pinned_pages(&znotif->z_mmp);
+               kfree(znotif);
+       }
+       WARN_ON(!list_empty(&tmp_list));
+       q = &sk->sk_error_queue;
+       spin_lock_irqsave(&q->lock, flags);
+       tail = skb_peek_tail(q);
+       if (!tail ||
+           SKB_EXT_ERR(tail)->ee.ee_origin != SO_EE_ORIGIN_ZEROCOPY)  {
+               __skb_queue_tail(q, skb);
+               skb = NULL;
+       }
+       spin_unlock_irqrestore(&q->lock, flags);
+       sk->sk_error_report(sk);
+       consume_skb(skb);
+}
+
 /*
  * This relies on dma_map_sg() not touching sg[].page during merging.
  */
@@ -66,11 +127,15 @@ static void rds_message_purge(struct rds_message *rm)
        for (i = 0; i < rm->data.op_nents; i++) {
                rdsdebug("putting data page %p\n", (void 
*)sg_page(&rm->data.op_sg[i]));
                /* XXX will have to put_page for page refs */
-               __free_page(sg_page(&rm->data.op_sg[i]));
+               if (!rm->data.op_zcopy)
+                       __free_page(sg_page(&rm->data.op_sg[i]));
+               else
+                       put_page(sg_page(&rm->data.op_sg[i]));
        }
        rm->data.op_nents = 0;
        spin_lock_irqsave(&rm->m_rs_lock, flags);
        if (rm->m_rs) {
+               rds_rm_zerocopy_callback(rm->m_rs);
                sock_put(rds_rs_to_sk(rm->m_rs));
                rm->m_rs = NULL;
        }
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 374ae83..de5015a 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,12 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t 
cookie)
 #define RDS_MSG_PAGEVEC                7
 #define RDS_MSG_FLUSH          8
 
+struct rds_znotifier {
+       struct list_head        z_list;
+       u32                     z_cookie;
+       struct mmpin            z_mmp;
+};
+
 struct rds_message {
        refcount_t              m_refcount;
        struct list_head        m_sock_item;
@@ -431,11 +437,14 @@ struct rds_message {
                } rdma;
                struct rm_data_op {
                        unsigned int            op_active:1;
-                       unsigned int            op_notify:1;
+                       unsigned int            op_notify:1,
+                                               op_zcopy:1,
+                                               op_pad_to_32:30;
                        unsigned int            op_nents;
                        unsigned int            op_count;
                        unsigned int            op_dmasg;
                        unsigned int            op_dmaoff;
+                       struct rds_znotifier    *op_mmp_znotifier;
                        struct scatterlist      *op_sg;
                } data;
        };
@@ -588,6 +597,8 @@ struct rds_sock {
        /* Socket receive path trace points*/
        u8                      rs_rx_traces;
        u8                      rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+       struct list_head        rs_znotify_queue; /* zerocopy completion */
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe..043f667 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, 
size_t size,
 
        if (msg_flags & MSG_OOB)
                goto out;
+       if (msg_flags & MSG_ERRQUEUE)
+               return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR,
+                                         msg_flags);
 
        while (1) {
                /* If there are pending notifications, do those - and nothing 
else */
diff --git a/net/rds/send.c b/net/rds/send.c
index 5ac0925..5c38ce3 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -635,7 +635,14 @@ static void rds_send_remove_from_sock(struct list_head 
*messages, int status)
                if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
                        struct rm_rdma_op *ro = &rm->rdma;
                        struct rds_notifier *notifier;
+                       struct rds_znotifier *znotifier;
 
+                       if (rm->data.op_zcopy) {
+                               znotifier = rm->data.op_mmp_znotifier;
+                               list_add_tail(&znotifier->z_list,
+                                             &rs->rs_znotify_queue);
+                               rm->data.op_mmp_znotifier = NULL;
+                       }
                        list_del_init(&rm->m_sock_item);
                        rds_send_sndbuf_remove(rs, rm);
 
-- 
1.7.1

Reply via email to