On 09/28/2017 02:57 PM, Jesper Dangaard Brouer wrote: [...]
+/* Convert xdp_buff to xdp_pkt */ +static struct xdp_pkt *convert_to_xdp_pkt(struct xdp_buff *xdp) +{ + struct xdp_pkt *xdp_pkt; + int headroom; + + /* Assure headroom is available for storing info */ + headroom = xdp->data - xdp->data_hard_start; + if (headroom < sizeof(*xdp_pkt)) + return NULL; + + /* Store info in top of packet */ + xdp_pkt = xdp->data_hard_start;
(You'd also need to handle data_meta here if set, and for below cpu_map_build_skb(), e.g. headroom is data_meta-data_hard_start.)
+ xdp_pkt->data = xdp->data; + xdp_pkt->len = xdp->data_end - xdp->data; + xdp_pkt->headroom = headroom - sizeof(*xdp_pkt); + + return xdp_pkt; +} + +static struct sk_buff *cpu_map_build_skb(struct bpf_cpu_map_entry *rcpu, + struct xdp_pkt *xdp_pkt) +{ + unsigned int frame_size; + void *pkt_data_start; + struct sk_buff *skb; + + /* build_skb need to place skb_shared_info after SKB end, and + * also want to know the memory "truesize". Thus, need to
[...]
static int cpu_map_kthread_run(void *data) { + const unsigned long busy_poll_jiffies = usecs_to_jiffies(2000); + unsigned long time_limit = jiffies + busy_poll_jiffies; struct bpf_cpu_map_entry *rcpu = data; + unsigned int empty_cnt = 0; set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { + unsigned int processed = 0, drops = 0; struct xdp_pkt *xdp_pkt; - schedule(); - /* Do work */ - while ((xdp_pkt = ptr_ring_consume(rcpu->queue))) { - /* For now just "refcnt-free" */ - page_frag_free(xdp_pkt); + /* Release CPU reschedule checks */ + if ((time_after_eq(jiffies, time_limit) || empty_cnt > 25) && + __ptr_ring_empty(rcpu->queue)) { + empty_cnt++; + schedule(); + time_limit = jiffies + busy_poll_jiffies; + WARN_ON(smp_processor_id() != rcpu->cpu); + } else { + cond_resched(); } + + /* Process packets in rcpu->queue */ + local_bh_disable(); + /* + * The bpf_cpu_map_entry is single consumer, with this + * kthread CPU pinned. Lockless access to ptr_ring + * consume side valid as no-resize allowed of queue. + */ + while ((xdp_pkt = __ptr_ring_consume(rcpu->queue))) { + struct sk_buff *skb; + int ret; + + /* Allow busy polling again */ + empty_cnt = 0; + + skb = cpu_map_build_skb(rcpu, xdp_pkt); + if (!skb) { + page_frag_free(xdp_pkt); + continue; + } + + /* Inject into network stack */ + ret = netif_receive_skb(skb);
Have you looked into whether it's feasible to reuse GRO engine here as well?
+ if (ret == NET_RX_DROP) + drops++; + + /* Limit BH-disable period */ + if (++processed == 8) + break; + } + local_bh_enable(); + __set_current_state(TASK_INTERRUPTIBLE); } put_cpu_map_entry(rcpu);
[...]