From: Magnus Karlsson <magnus.karls...@intel.com> Here, Tx support is added. The user fills the Tx queue with frames to be sent by the kernel, and let's the kernel know using the sendmsg syscall.
Signed-off-by: Magnus Karlsson <magnus.karls...@intel.com> --- net/xdp/xsk.c | 151 +++++++++++++++++++++++++++++++++++++++++++++++++++- net/xdp/xsk_queue.h | 33 ++++++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index a9bceb0958d8..685b6f360628 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -35,6 +35,7 @@ #include "xsk_queue.h" #include "xdp_umem.h" +#define TX_BATCH_SIZE 16 #define RX_BATCH_SIZE 16 struct xdp_sock { @@ -44,10 +45,12 @@ struct xdp_sock { struct xskq_iter rx_it; u64 rx_dropped; struct xsk_queue *tx; + struct xskq_iter tx_it; struct net_device *dev; /* Protects multiple processes in the control path */ struct mutex mutex; struct xdp_umem *umem; + u32 tx_umem_head; u32 ifindex; u16 queue_id; }; @@ -112,6 +115,146 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) return err; } +static void xsk_destruct_skb(struct sk_buff *skb) +{ + u32 buff_id = (u32)(long)skb_shinfo(skb)->destructor_arg; + struct xdp_sock *xs = xdp_sk(skb->sk); + + WARN_ON_ONCE(xdp_umem_enq_one(xs->umem->cq, buff_id)); + + sock_wfree(skb); +} + +static int xsk_xmit_skb(struct sk_buff *skb) +{ + struct net_device *dev = skb->dev; + struct sk_buff *orig_skb = skb; + struct netdev_queue *txq; + int ret = NETDEV_TX_BUSY; + bool again = false; + + if (unlikely(!netif_running(dev) || !netif_carrier_ok(dev))) + goto drop; + + skb = validate_xmit_skb_list(skb, dev, &again); + if (skb != orig_skb) + return NET_XMIT_DROP; + + txq = skb_get_tx_queue(dev, skb); + + local_bh_disable(); + + HARD_TX_LOCK(dev, txq, smp_processor_id()); + if (!netif_xmit_frozen_or_drv_stopped(txq)) + ret = netdev_start_xmit(skb, dev, txq, false); + HARD_TX_UNLOCK(dev, txq); + + local_bh_enable(); + + if (!dev_xmit_complete(ret)) + goto out_err; + + return ret; +drop: + atomic_long_inc(&dev->tx_dropped); +out_err: + return NET_XMIT_DROP; +} + +static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, + size_t total_len) +{ + bool need_wait = !(m->msg_flags & MSG_DONTWAIT); + struct xdp_sock *xs = xdp_sk(sk); + bool sent_frame = false; + struct sk_buff *skb; + u32 max_outstanding; + int err = 0; + + if (unlikely(!xs->tx)) + return -ENOBUFS; + if (need_wait) + return -EOPNOTSUPP; + + mutex_lock(&xs->mutex); + + max_outstanding = xskq_nb_free(xs->umem->cq, xs->tx_umem_head, + TX_BATCH_SIZE); + + while (xskq_next_frame_deq(xs->tx, &xs->tx_it, TX_BATCH_SIZE)) { + char *buffer; + u32 id, len; + + if (max_outstanding-- == 0) { + err = -EAGAIN; + goto out_err; + } + + len = xskq_rxtx_get_len(&xs->tx_it); + if (unlikely(len > xs->dev->mtu)) { + err = -EMSGSIZE; + goto out_err; + } + + skb = sock_alloc_send_skb(sk, len, !need_wait, &err); + if (unlikely(!skb)) { + err = -EAGAIN; + goto out_err; + } + + skb_put(skb, len); + id = xskq_rxtx_get_id(&xs->tx_it); + buffer = xdp_umem_get_data(xs->umem, id) + + xskq_rxtx_get_offset(&xs->tx_it); + err = skb_store_bits(skb, 0, buffer, len); + if (unlikely(err)) + goto out_store; + + skb->dev = xs->dev; + skb->priority = sk->sk_priority; + skb->mark = sk->sk_mark; + skb_set_queue_mapping(skb, xs->queue_id); + skb_shinfo(skb)->destructor_arg = (void *)(long)id; + skb->destructor = xsk_destruct_skb; + + err = xsk_xmit_skb(skb); + /* Ignore NET_XMIT_CN as packet might have been sent */ + if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) { + err = -EAGAIN; + goto out_store; + } + + xs->tx_umem_head++; + sent_frame = true; + } + + goto out; + +out_store: + kfree_skb(skb); +out_err: + xskq_deq_return_frame(&xs->tx_it); +out: + if (sent_frame) + sk->sk_write_space(sk); + + mutex_unlock(&xs->mutex); + return err; +} + +static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) +{ + struct sock *sk = sock->sk; + struct xdp_sock *xs = xdp_sk(sk); + + if (unlikely(!xs->dev)) + return -ENXIO; + if (unlikely(!(xs->dev->flags & IFF_UP))) + return -ENETDOWN; + + return xsk_generic_xmit(sk, m, total_len); +} + static unsigned int xsk_poll(struct file *file, struct socket *sock, struct poll_table_struct *wait) { @@ -121,6 +264,8 @@ static unsigned int xsk_poll(struct file *file, struct socket *sock, if (xs->rx && !xskq_empty(xs->rx)) mask |= POLLIN | POLLRDNORM; + if (xs->tx && !xskq_full(xs->tx)) + mask |= POLLOUT | POLLWRNORM; return mask; } @@ -266,6 +411,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) } else { /* This xsk has its own umem. */ xskq_set_umem(xs->umem->fq, &xs->umem->props); + xskq_set_umem(xs->umem->cq, &xs->umem->props); } /* Rebind? */ @@ -281,8 +427,11 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) xs->queue_id = sxdp->sxdp_queue_id; xskq_init_iter(&xs->rx_it); + xskq_init_iter(&xs->tx_it); + xs->tx_umem_head = 0; xskq_set_umem(xs->rx, &xs->umem->props); + xskq_set_umem(xs->tx, &xs->umem->props); out_unlock: if (err) @@ -433,7 +582,7 @@ static const struct proto_ops xsk_proto_ops = { .shutdown = sock_no_shutdown, .setsockopt = xsk_setsockopt, .getsockopt = sock_no_getsockopt, - .sendmsg = sock_no_sendmsg, + .sendmsg = xsk_sendmsg, .recvmsg = sock_no_recvmsg, .mmap = xsk_mmap, .sendpage = sock_no_sendpage, diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index af6e651f1207..94edc7e7a503 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -209,6 +209,11 @@ static inline void xskq_deq_return_frame(struct xskq_iter *it) it->tail--; } +static inline void xskq_enq_return_frame(struct xsk_queue *q) +{ + q->iter_head_idx--; +} + static inline void xskq_enq_flush(struct xsk_queue *q) { /* Order flags and data */ @@ -226,6 +231,21 @@ static inline u32 xskq_rxtx_get_ring_size(struct xsk_queue *q) q->nentries * sizeof(struct xdp_desc)); } +static inline u32 xskq_rxtx_get_id(struct xskq_iter *it) +{ + return it->desc_copy.idx; +} + +static inline u32 xskq_rxtx_get_len(struct xskq_iter *it) +{ + return it->desc_copy.len; +} + +static inline u32 xskq_rxtx_get_offset(struct xskq_iter *it) +{ + return it->desc_copy.offset; +} + static inline int xskq_rxtx_enq_frame(struct xsk_queue *q, u32 id, u32 len, u16 offset) { @@ -258,6 +278,19 @@ static inline u32 xdp_umem_get_id(struct xsk_queue *q, return ring->desc[it->tail & q->ring_mask]; } +static inline int xdp_umem_enq_one(struct xsk_queue *q, u32 idx) +{ + struct xdp_umem_queue *ring = (struct xdp_umem_queue *)q->ring; + + if (xskq_nb_free(q, q->iter_head_idx, 1) == 0) + return -ENOSPC; + + ring->desc[q->iter_head_idx++ & q->ring_mask] = idx; + + xskq_enq_flush(q); + return 0; +} + struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); void xskq_destroy(struct xsk_queue *q_ops); -- 2.14.1