From: Björn Töpel <bjorn.to...@intel.com> In this commit we start making use of the new ndo_bpf sub-commands, and try to enable zero copy, if available.
Signed-off-by: Björn Töpel <bjorn.to...@intel.com> --- net/xdp/xsk.c | 185 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 145 insertions(+), 40 deletions(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index f372c3288301..f05ab825d157 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -29,15 +29,21 @@ #include <linux/netdevice.h> #include <net/sock.h> +#include <net/xdp_sock.h> +#include <linux/buff_pool.h> + #include "xsk.h" #include "xsk_buff.h" #include "xsk_ring.h" +#include "xsk_buff_pool.h" +#include "xsk_packet_array.h" #define XSK_UMEM_MIN_FRAME_SIZE 2048 #define XSK_ARRAY_SIZE 512 struct xsk_info { struct xsk_packet_array *pa; + struct buff_pool *bp; spinlock_t pa_lock; struct xsk_queue *q; struct xsk_umem *umem; @@ -56,8 +62,24 @@ struct xdp_sock { struct mutex tx_mutex; u32 ifindex; u16 queue_id; + bool zc_mode; }; +static inline bool xsk_is_zc_cap(struct xdp_sock *xs) +{ + return xs->zc_mode; +} + +static void xsk_set_zc_cap(struct xdp_sock *xs) +{ + xs->zc_mode = true; +} + +static void xsk_clear_zc_cap(struct xdp_sock *xs) +{ + xs->zc_mode = false; +} + static struct xdp_sock *xdp_sk(struct sock *sk) { return (struct xdp_sock *)sk; @@ -323,6 +345,22 @@ static int xsk_init_tx_ring(struct sock *sk, int mr_fd, u32 desc_nr) return xsk_init_ring(sk, mr_fd, desc_nr, &xs->tx); } +static void xsk_disable_zc(struct xdp_sock *xs) +{ + struct netdev_bpf bpf = {}; + + if (!xsk_is_zc_cap(xs)) + return; + + bpf.command = XDP_UNREGISTER_XSK; + bpf.xsk.queue_id = xs->queue_id; + + rtnl_lock(); + (void)xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf); + rtnl_unlock(); + xsk_clear_zc_cap(xs); +} + static int xsk_release(struct socket *sock) { struct sock *sk = sock->sk; @@ -344,14 +382,22 @@ static int xsk_release(struct socket *sock) xs_prev = xs->dev->_rx[xs->queue_id].xs; rcu_assign_pointer(xs->dev->_rx[xs->queue_id].xs, NULL); + xsk_disable_zc(xs); + /* Wait for driver to stop using the xdp socket. */ synchronize_net(); xskpa_destroy(xs->rx.pa); - xskpa_destroy(xs->tx.pa); - xsk_umem_destroy(xs_prev->umem); + bpool_destroy(xs->rx.bp); xskq_destroy(xs_prev->rx.q); + xsk_buff_info_destroy(xs->rx.buff_info); + + xskpa_destroy(xs->tx.pa); xskq_destroy(xs_prev->tx.q); + xsk_buff_info_destroy(xs->tx.buff_info); + + xsk_umem_destroy(xs_prev->umem); + kobject_put(&xs_prev->dev->_rx[xs->queue_id].kobj); dev_put(xs_prev->dev); } @@ -365,6 +411,45 @@ static int xsk_release(struct socket *sock) return 0; } +static int xsk_dma_map_pool_cb(struct buff_pool *pool, struct device *dev, + enum dma_data_direction dir, + unsigned long attrs) +{ + struct xsk_buff_pool *bp = (struct xsk_buff_pool *)pool->pool; + + return xsk_buff_dma_map(bp->bi, dev, dir, attrs); +} + +static void xsk_error_report(void *ctx, int err) +{ + struct xsk_sock *xs = (struct xsk_sock *)ctx; +} + +static void xsk_try_enable_zc(struct xdp_sock *xs) +{ + struct xsk_rx_parms rx_parms = {}; + struct netdev_bpf bpf = {}; + int err; + + if (!xs->dev->netdev_ops->ndo_bpf) + return; + + rx_parms.buff_pool = xs->rx.bp; + rx_parms.dma_map = xsk_dma_map_pool_cb; + rx_parms.error_report_ctx = xs; + rx_parms.error_report = xsk_error_report; + + bpf.command = XDP_REGISTER_XSK; + bpf.xsk.rx_parms = &rx_parms; + bpf.xsk.queue_id = xs->queue_id; + + rtnl_lock(); + err = xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf); + rtnl_unlock(); + if (!err) + xsk_set_zc_cap(xs); +} + static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) { struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr; @@ -429,6 +514,13 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) goto out_rx_pa; } + /* ...and Rx buffer pool is used for zerocopy. */ + xs->rx.bp = xsk_buff_pool_create(xs->rx.buff_info, xs->rx.q); + if (!xs->rx.bp) { + err = -ENOMEM; + goto out_rx_bp; + } + /* Tx */ xs->tx.buff_info = xsk_buff_info_create(xs->tx.umem); if (!xs->tx.buff_info) { @@ -446,12 +538,17 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) rcu_assign_pointer(dev->_rx[sxdp->sxdp_queue_id].xs, xs); + xsk_try_enable_zc(xs); + goto out_unlock; out_tx_pa: xsk_buff_info_destroy(xs->tx.buff_info); xs->tx.buff_info = NULL; out_tx_bi: + bpool_destroy(xs->rx.bp); + xs->rx.bp = NULL; +out_rx_bp: xskpa_destroy(xs->rx.pa); xs->rx.pa = NULL; out_rx_pa: @@ -509,27 +606,16 @@ int xsk_generic_rcv(struct xdp_buff *xdp) } EXPORT_SYMBOL_GPL(xsk_generic_rcv); -struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp) +static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { u32 len = xdp->data_end - xdp->data; struct xsk_frame_set p; - rcu_read_lock(); - if (!xsk) - xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index); - if (unlikely(!xsk)) { - rcu_read_unlock(); - return ERR_PTR(-EINVAL); - } - - if (!xskpa_next_frame_populate(xsk->rx.pa, &p)) { - rcu_read_unlock(); - return ERR_PTR(-ENOSPC); - } + if (!xskpa_next_frame_populate(xs->rx.pa, &p)) + return -ENOSPC; memcpy(xskf_get_data(&p), xdp->data, len); xskf_set_frame_no_offset(&p, len, true); - rcu_read_unlock(); /* We assume that the semantic of xdp_do_redirect is such that * ndo_xdp_xmit will decrease the refcount of the page when it @@ -540,41 +626,60 @@ struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp) */ page_frag_free(xdp->data); - return xsk; + return 0; } -EXPORT_SYMBOL_GPL(xsk_rcv); -int xsk_zc_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp) +static void __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp) { - u32 offset = xdp->data - xdp->data_hard_start; - u32 len = xdp->data_end - xdp->data; - struct xsk_frame_set p; + struct xsk_buff *b = (struct xsk_buff *)xdp->bp_handle; - /* We do not need any locking here since we are guaranteed - * a single producer and a single consumer. - */ - if (xskpa_next_frame_populate(xsk->rx.pa, &p)) { - xskf_set_frame(&p, len, offset, true); - return 0; - } - - /* No user-space buffer to put the packet in. */ - return -ENOSPC; + xskq_enq_lazy(xs->rx.q, b->id, xdp->data_end - xdp->data, + b->offset + (xdp->data - xdp->data_hard_start)); } -EXPORT_SYMBOL_GPL(xsk_zc_rcv); -void xsk_flush(struct xdp_sock *xsk) +struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp) { + int err = 0; + rcu_read_lock(); - if (!xsk) - xsk = lookup_xsk(xsk->dev, xsk->queue_id); - if (unlikely(!xsk)) { - rcu_read_unlock(); - return; + + if (!xsk) { + xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index); + if (!xsk) { + err = -EINVAL; + goto out; + } } - WARN_ON_ONCE(xskpa_flush(xsk->rx.pa)); + /* XXX Ick, this is very hacky. Need a better solution */ + if (xdp->rxq->bpool) + __xsk_rcv_zc(xsk, xdp); + else + err = __xsk_rcv(xsk, xdp); + +out: rcu_read_unlock(); + + return err ? ERR_PTR(err) : xsk; +} +EXPORT_SYMBOL_GPL(xsk_rcv); + +static void __xsk_flush(struct xdp_sock *xs) +{ + WARN_ON_ONCE(xskpa_flush(xs->rx.pa)); +} + +static void __xsk_flush_zc(struct xdp_sock *xs) +{ + xskq_enq_flush(xs->rx.q); +} + +void xsk_flush(struct xdp_sock *xsk) +{ + if (xsk_is_zc_cap(xsk)) + __xsk_flush_zc(xsk); + else + __xsk_flush(xsk); } EXPORT_SYMBOL_GPL(xsk_flush); -- 2.14.1