From: Björn Töpel <bjorn.to...@intel.com>

In this commit we start making use of the new ndo_bpf sub-commands,
and try to enable zero copy, if available.

Signed-off-by: Björn Töpel <bjorn.to...@intel.com>
---
 net/xdp/xsk.c | 185 +++++++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 145 insertions(+), 40 deletions(-)

diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index f372c3288301..f05ab825d157 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -29,15 +29,21 @@
 #include <linux/netdevice.h>
 #include <net/sock.h>
 
+#include <net/xdp_sock.h>
+#include <linux/buff_pool.h>
+
 #include "xsk.h"
 #include "xsk_buff.h"
 #include "xsk_ring.h"
+#include "xsk_buff_pool.h"
+#include "xsk_packet_array.h"
 
 #define XSK_UMEM_MIN_FRAME_SIZE 2048
 #define XSK_ARRAY_SIZE 512
 
 struct xsk_info {
        struct xsk_packet_array *pa;
+       struct buff_pool *bp;
        spinlock_t pa_lock;
        struct xsk_queue *q;
        struct xsk_umem *umem;
@@ -56,8 +62,24 @@ struct xdp_sock {
        struct mutex tx_mutex;
        u32 ifindex;
        u16 queue_id;
+       bool zc_mode;
 };
 
+static inline bool xsk_is_zc_cap(struct xdp_sock *xs)
+{
+       return xs->zc_mode;
+}
+
+static void xsk_set_zc_cap(struct xdp_sock *xs)
+{
+       xs->zc_mode = true;
+}
+
+static void xsk_clear_zc_cap(struct xdp_sock *xs)
+{
+       xs->zc_mode = false;
+}
+
 static struct xdp_sock *xdp_sk(struct sock *sk)
 {
        return (struct xdp_sock *)sk;
@@ -323,6 +345,22 @@ static int xsk_init_tx_ring(struct sock *sk, int mr_fd, 
u32 desc_nr)
        return xsk_init_ring(sk, mr_fd, desc_nr, &xs->tx);
 }
 
+static void xsk_disable_zc(struct xdp_sock *xs)
+{
+       struct netdev_bpf bpf = {};
+
+       if (!xsk_is_zc_cap(xs))
+               return;
+
+       bpf.command = XDP_UNREGISTER_XSK;
+       bpf.xsk.queue_id = xs->queue_id;
+
+       rtnl_lock();
+       (void)xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+       rtnl_unlock();
+       xsk_clear_zc_cap(xs);
+}
+
 static int xsk_release(struct socket *sock)
 {
        struct sock *sk = sock->sk;
@@ -344,14 +382,22 @@ static int xsk_release(struct socket *sock)
                xs_prev = xs->dev->_rx[xs->queue_id].xs;
                rcu_assign_pointer(xs->dev->_rx[xs->queue_id].xs, NULL);
 
+               xsk_disable_zc(xs);
+
                /* Wait for driver to stop using the xdp socket. */
                synchronize_net();
 
                xskpa_destroy(xs->rx.pa);
-               xskpa_destroy(xs->tx.pa);
-               xsk_umem_destroy(xs_prev->umem);
+               bpool_destroy(xs->rx.bp);
                xskq_destroy(xs_prev->rx.q);
+               xsk_buff_info_destroy(xs->rx.buff_info);
+
+               xskpa_destroy(xs->tx.pa);
                xskq_destroy(xs_prev->tx.q);
+               xsk_buff_info_destroy(xs->tx.buff_info);
+
+               xsk_umem_destroy(xs_prev->umem);
+
                kobject_put(&xs_prev->dev->_rx[xs->queue_id].kobj);
                dev_put(xs_prev->dev);
        }
@@ -365,6 +411,45 @@ static int xsk_release(struct socket *sock)
        return 0;
 }
 
+static int xsk_dma_map_pool_cb(struct buff_pool *pool, struct device *dev,
+                              enum dma_data_direction dir,
+                              unsigned long attrs)
+{
+       struct xsk_buff_pool *bp = (struct xsk_buff_pool *)pool->pool;
+
+       return xsk_buff_dma_map(bp->bi, dev, dir, attrs);
+}
+
+static void xsk_error_report(void *ctx, int err)
+{
+       struct xsk_sock *xs = (struct xsk_sock *)ctx;
+}
+
+static void xsk_try_enable_zc(struct xdp_sock *xs)
+{
+       struct xsk_rx_parms rx_parms = {};
+       struct netdev_bpf bpf = {};
+       int err;
+
+       if (!xs->dev->netdev_ops->ndo_bpf)
+               return;
+
+       rx_parms.buff_pool = xs->rx.bp;
+       rx_parms.dma_map = xsk_dma_map_pool_cb;
+       rx_parms.error_report_ctx = xs;
+       rx_parms.error_report = xsk_error_report;
+
+       bpf.command = XDP_REGISTER_XSK;
+       bpf.xsk.rx_parms = &rx_parms;
+       bpf.xsk.queue_id = xs->queue_id;
+
+       rtnl_lock();
+       err = xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+       rtnl_unlock();
+       if (!err)
+               xsk_set_zc_cap(xs);
+}
+
 static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 {
        struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
@@ -429,6 +514,13 @@ static int xsk_bind(struct socket *sock, struct sockaddr 
*addr, int addr_len)
                goto out_rx_pa;
        }
 
+       /* ...and Rx buffer pool is used for zerocopy. */
+       xs->rx.bp = xsk_buff_pool_create(xs->rx.buff_info, xs->rx.q);
+       if (!xs->rx.bp) {
+               err = -ENOMEM;
+               goto out_rx_bp;
+       }
+
        /* Tx */
        xs->tx.buff_info = xsk_buff_info_create(xs->tx.umem);
        if (!xs->tx.buff_info) {
@@ -446,12 +538,17 @@ static int xsk_bind(struct socket *sock, struct sockaddr 
*addr, int addr_len)
 
        rcu_assign_pointer(dev->_rx[sxdp->sxdp_queue_id].xs, xs);
 
+       xsk_try_enable_zc(xs);
+
        goto out_unlock;
 
 out_tx_pa:
        xsk_buff_info_destroy(xs->tx.buff_info);
        xs->tx.buff_info = NULL;
 out_tx_bi:
+       bpool_destroy(xs->rx.bp);
+       xs->rx.bp = NULL;
+out_rx_bp:
        xskpa_destroy(xs->rx.pa);
        xs->rx.pa = NULL;
 out_rx_pa:
@@ -509,27 +606,16 @@ int xsk_generic_rcv(struct xdp_buff *xdp)
 }
 EXPORT_SYMBOL_GPL(xsk_generic_rcv);
 
-struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
 {
        u32 len = xdp->data_end - xdp->data;
        struct xsk_frame_set p;
 
-       rcu_read_lock();
-       if (!xsk)
-               xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
-       if (unlikely(!xsk)) {
-               rcu_read_unlock();
-               return ERR_PTR(-EINVAL);
-       }
-
-       if (!xskpa_next_frame_populate(xsk->rx.pa, &p)) {
-               rcu_read_unlock();
-               return ERR_PTR(-ENOSPC);
-       }
+       if (!xskpa_next_frame_populate(xs->rx.pa, &p))
+               return -ENOSPC;
 
        memcpy(xskf_get_data(&p), xdp->data, len);
        xskf_set_frame_no_offset(&p, len, true);
-       rcu_read_unlock();
 
        /* We assume that the semantic of xdp_do_redirect is such that
         * ndo_xdp_xmit will decrease the refcount of the page when it
@@ -540,41 +626,60 @@ struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct 
xdp_buff *xdp)
         */
        page_frag_free(xdp->data);
 
-       return xsk;
+       return 0;
 }
-EXPORT_SYMBOL_GPL(xsk_rcv);
 
-int xsk_zc_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static void __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp)
 {
-       u32 offset = xdp->data - xdp->data_hard_start;
-       u32 len = xdp->data_end - xdp->data;
-       struct xsk_frame_set p;
+       struct xsk_buff *b = (struct xsk_buff *)xdp->bp_handle;
 
-       /* We do not need any locking here since we are guaranteed
-        * a single producer and a single consumer.
-        */
-       if (xskpa_next_frame_populate(xsk->rx.pa, &p)) {
-               xskf_set_frame(&p, len, offset, true);
-               return 0;
-       }
-
-       /* No user-space buffer to put the packet in. */
-       return -ENOSPC;
+       xskq_enq_lazy(xs->rx.q, b->id, xdp->data_end - xdp->data,
+                     b->offset + (xdp->data - xdp->data_hard_start));
 }
-EXPORT_SYMBOL_GPL(xsk_zc_rcv);
 
-void xsk_flush(struct xdp_sock *xsk)
+struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
 {
+       int err = 0;
+
        rcu_read_lock();
-       if (!xsk)
-               xsk = lookup_xsk(xsk->dev, xsk->queue_id);
-       if (unlikely(!xsk)) {
-               rcu_read_unlock();
-               return;
+
+       if (!xsk) {
+               xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
+               if (!xsk) {
+                       err = -EINVAL;
+                       goto out;
+               }
        }
 
-       WARN_ON_ONCE(xskpa_flush(xsk->rx.pa));
+       /* XXX Ick, this is very hacky. Need a better solution */
+       if (xdp->rxq->bpool)
+               __xsk_rcv_zc(xsk, xdp);
+       else
+               err = __xsk_rcv(xsk, xdp);
+
+out:
        rcu_read_unlock();
+
+       return err ? ERR_PTR(err) : xsk;
+}
+EXPORT_SYMBOL_GPL(xsk_rcv);
+
+static void __xsk_flush(struct xdp_sock *xs)
+{
+       WARN_ON_ONCE(xskpa_flush(xs->rx.pa));
+}
+
+static void __xsk_flush_zc(struct xdp_sock *xs)
+{
+       xskq_enq_flush(xs->rx.q);
+}
+
+void xsk_flush(struct xdp_sock *xsk)
+{
+       if (xsk_is_zc_cap(xsk))
+               __xsk_flush_zc(xsk);
+       else
+               __xsk_flush(xsk);
 }
 EXPORT_SYMBOL_GPL(xsk_flush);
 
-- 
2.14.1

Reply via email to