On 2017年04月17日 07:19, Michael S. Tsirkin wrote:
Applications that consume a batch of entries in one go can benefit from ability to return some of them back into the ring. Add an API for that - assuming there's space. If there's no space naturally we can't do this and have to drop entries, but this implies ring is full so we'd likely drop some anyway. Signed-off-by: Michael S. Tsirkin <[email protected]> --- Jason, in my mind the biggest issue with your batching patchset is the backet drops on disconnect. This API will help avoid that in the common case.
Ok, I will rebase the series on top of this. (Though I don't think we care the packet loss).
I would still prefer that we understand what's going on,
I try to reply in another thread, does it make sense?
and I would like to know what's the smallest batch size that's still helpful,
Yes, I've replied in another thread, the result is: no batching 1.88Mpps RX_BATCH=1 1.93Mpps RX_BATCH=4 2.11Mpps RX_BATCH=16 2.14Mpps RX_BATCH=64 2.25Mpps RX_BATCH=256 2.18Mpps
but I'm not going to block the patch on these grounds assuming packet drops are fixed.
Thanks a lot.
Lightly tested - this is on top of consumer batching patches. Thanks! include/linux/ptr_ring.h | 57 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 783e7f5..5fbeab4 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -457,6 +457,63 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) return 0; }+/*+ * Return entries into ring. Destroy entries that don't fit. + * + * Note: this is expected to be a rare slow path operation. + * + * Note: producer lock is nested within consumer lock, so if you + * resize you must make sure all uses nest correctly. + * In particular if you consume ring in interrupt or BH context, you must + * disable interrupts/BH when doing so. + */ +static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, + void (*destroy)(void *)) +{ + unsigned long flags; + int head; + + spin_lock_irqsave(&(r)->consumer_lock, flags); + spin_lock(&(r)->producer_lock); + + if (!r->size) + goto done; + + /* + * Clean out buffered entries (for simplicity). This way following code + * can test entries for NULL and if not assume they are valid. + */ + head = r->consumer_head - 1; + while (likely(head >= r->consumer_tail)) + r->queue[head--] = NULL; + r->consumer_tail = r->consumer_head; + + /* + * Go over entries in batch, start moving head back and copy entries. + * Stop when we run into previously unconsumed entries. + */ + while (n--) { + head = r->consumer_head - 1; + if (head < 0) + head = r->size - 1; + if (r->queue[head]) { + /* This batch entry will have to be destroyed. */ + ++n; + goto done; + } + r->queue[head] = batch[n]; + r->consumer_tail = r->consumer_head = head; + } + +done: + /* Destroy all entries left in the batch. */ + while (n--) { + destroy(batch[n]); + } + spin_unlock(&(r)->producer_lock); + spin_unlock_irqrestore(&(r)->consumer_lock, flags); +} + static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, int size, gfp_t gfp, void (*destroy)(void *))
