On Mon, Apr 23, 2018 at 9:56 AM, Björn Töpel <bjorn.to...@gmail.com> wrote:
> From: Magnus Karlsson <magnus.karls...@intel.com>
>
> Here, Tx support is added. The user fills the Tx queue with frames to
> be sent by the kernel, and let's the kernel know using the sendmsg
> syscall.
>
> Signed-off-by: Magnus Karlsson <magnus.karls...@intel.com>

> +static int xsk_xmit_skb(struct sk_buff *skb)

This is basically packet_direct_xmit. Might be better to just move that
to net/core/dev.c and use in both AF_PACKET and AF_XDP.

Also, (eventually) AF_XDP may also want to support the regular path
through dev_queue_xmit to go through traffic shaping.

> +{
> +       struct net_device *dev = skb->dev;
> +       struct sk_buff *orig_skb = skb;
> +       struct netdev_queue *txq;
> +       int ret = NETDEV_TX_BUSY;
> +       bool again = false;
> +
> +       if (unlikely(!netif_running(dev) || !netif_carrier_ok(dev)))
> +               goto drop;
> +
> +       skb = validate_xmit_skb_list(skb, dev, &again);
> +       if (skb != orig_skb)
> +               return NET_XMIT_DROP;

Need to free generated segment list on error, see packet_direct_xmit.

> +
> +       txq = skb_get_tx_queue(dev, skb);
> +
> +       local_bh_disable();
> +
> +       HARD_TX_LOCK(dev, txq, smp_processor_id());
> +       if (!netif_xmit_frozen_or_drv_stopped(txq))
> +               ret = netdev_start_xmit(skb, dev, txq, false);
> +       HARD_TX_UNLOCK(dev, txq);
> +
> +       local_bh_enable();
> +
> +       if (!dev_xmit_complete(ret))
> +               goto out_err;
> +
> +       return ret;
> +drop:
> +       atomic_long_inc(&dev->tx_dropped);
> +out_err:
> +       return NET_XMIT_DROP;
> +}

> +static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
> +                           size_t total_len)
> +{
> +       bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
> +       u32 max_batch = TX_BATCH_SIZE;
> +       struct xdp_sock *xs = xdp_sk(sk);
> +       bool sent_frame = false;
> +       struct xdp_desc desc;
> +       struct sk_buff *skb;
> +       int err = 0;
> +
> +       if (unlikely(!xs->tx))
> +               return -ENOBUFS;
> +       if (need_wait)
> +               return -EOPNOTSUPP;
> +
> +       mutex_lock(&xs->mutex);
> +
> +       while (xskq_peek_desc(xs->tx, &desc)) {

It is possible to pass a chain of skbs to validate_xmit_skb_list and
eventually pass this chain to xsk_xmit_skb, amortizing the cost of
taking the txq lock. Fine to ignore for this patch set.

> +               char *buffer;
> +               u32 id, len;
> +
> +               if (max_batch-- == 0) {
> +                       err = -EAGAIN;
> +                       goto out;
> +               }
> +
> +               if (xskq_reserve_id(xs->umem->cq)) {
> +                       err = -EAGAIN;
> +                       goto out;
> +               }
> +
> +               len = desc.len;
> +               if (unlikely(len > xs->dev->mtu)) {
> +                       err = -EMSGSIZE;
> +                       goto out;
> +               }
> +
> +               skb = sock_alloc_send_skb(sk, len, !need_wait, &err);
> +               if (unlikely(!skb)) {
> +                       err = -EAGAIN;
> +                       goto out;
> +               }
> +
> +               skb_put(skb, len);
> +               id = desc.idx;
> +               buffer = xdp_umem_get_data(xs->umem, id) + desc.offset;
> +               err = skb_store_bits(skb, 0, buffer, len);
> +               if (unlikely(err))
> +                       goto out_store;

As xsk_destruct_skb delays notification until consume_skb is called, this
copy can be avoided by linking the xdp buffer into the skb frags array,
analogous to tpacket_snd.

You probably don't care much about the copy slow path, and this can be
implemented later, so also no need to do in this patchset.

static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
+                                             struct xdp_desc *desc)
+{
+       struct xdp_rxtx_ring *ring;
+
+       if (q->cons_tail == q->cons_head) {
+               WRITE_ONCE(q->ring->consumer, q->cons_tail);
+               q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
+
+               /* Order consumer and data */
+               smp_rmb();
+
+               return xskq_validate_desc(q, desc);
+       }
+
+       ring = (struct xdp_rxtx_ring *)q->ring;
+       *desc = ring->desc[q->cons_tail & q->ring_mask];
+       return desc;

This only validates descriptors if taking the branch.

Reply via email to