On 2021/01/27 3:41, Lorenzo Bianconi wrote:
Introduce bulking support for XDP_PASS verdict forwarding skbs to
the networking stack

Signed-off-by: Lorenzo Bianconi <lore...@kernel.org>
---
  drivers/net/veth.c | 43 ++++++++++++++++++++++++++-----------------
  1 file changed, 26 insertions(+), 17 deletions(-)

diff --git a/drivers/net/veth.c b/drivers/net/veth.c
index 6e03b619c93c..23137d9966da 100644
--- a/drivers/net/veth.c
+++ b/drivers/net/veth.c
@@ -35,6 +35,7 @@
  #define VETH_XDP_HEADROOM     (XDP_PACKET_HEADROOM + NET_IP_ALIGN)
#define VETH_XDP_TX_BULK_SIZE 16
+#define VETH_XDP_BATCH         8
struct veth_stats {
        u64     rx_drops;
@@ -787,27 +788,35 @@ static int veth_xdp_rcv(struct veth_rq *rq, int budget,
        int i, done = 0;
for (i = 0; i < budget; i++) {
-               void *ptr = __ptr_ring_consume(&rq->xdp_ring);
-               struct sk_buff *skb;
+               void *frames[VETH_XDP_BATCH];
+               void *skbs[VETH_XDP_BATCH];
+               int i, n_frame, n_skb = 0;

'i' is a shadowed variable. I think this may be confusing.

- if (!ptr)
+               n_frame = __ptr_ring_consume_batched(&rq->xdp_ring, frames,
+                                                    VETH_XDP_BATCH);

This apparently exceeds the budget.
This will process budget*VETH_XDP_BATCH packets at most.
(You are probably aware of this because you return 'i' instead of 'done'?)

Also I'm not sure if we need to introduce __ptr_ring_consume_batched() here.
The function just does __ptr_ring_consume() n times.

IIUC Your final code looks like this:

for (budget) {
        n_frame = __ptr_ring_consume_batched(VETH_XDP_BATCH);
        for (n_frame) {
                if (frame is XDP)
                        xdpf[n_xdpf++] = to_xdp(frame);
                else
                        skbs[n_skb++] = frame;
        }

        if (n_xdpf)
                veth_xdp_rcv_batch(xdpf);

        for (n_skb) {
                skb = veth_xdp_rcv_skb(skbs[i]);
                napi_gro_receive(skb);
        }
}

Your code processes VETH_XDP_BATCH packets at a time no matter whether each of them is xdp_frame or skb, but I think you actually want to process VETH_XDP_BATCH xdp_frames at a time?
Then, why not doing like this?

for (budget) {
        ptr = __ptr_ring_consume();
        if (ptr is XDP) {
                if (n_xdpf >= VETH_XDP_BATCH) {
                        veth_xdp_rcv_batch(xdpf);
                        n_xdpf = 0;
                }
                xdpf[n_xdpf++] = to_xdp(ptr);
        } else {
                skb = veth_xdp_rcv_skb(ptr);
                napi_gro_receive(skb);
        }
}
if (n_xdpf)
        veth_xdp_rcv_batch(xdpf);

Toshiaki Makita

Reply via email to