Remove the RTE_USE_C11_MEM_MODEL build switch; C11 atomics are now the default for all platforms. Unifies __rte_ring_update_tail into the C11 form (atomic_store_release replaces the older rte_smp_wmb + plain store on the generic path) and renames rte_ring_generic_pvt.h to rte_ring_x86_pvt.h to reflect its new scope.
Also splits the head-move helper into separate ST and MT variants, removing the runtime is_st branch from the MT retry loop. This gets small boost and scopes the following exception more tightly. Exception: on x86 with GCC, atomic_compare_exchange on the head CAS regresses MP/MC contended throughput by ~20% existing hand-written cmpxchg. As a workaround, GCC-on-x86 builds use the older __sync_bool_compare_and_swap builtin, which generates equivalent code to the original asm. Can be reverted if/when GCC gets fixed; similar issue was observed in Linux kernel. Signed-off-by: Stephen Hemminger <[email protected]> --- lib/ring/meson.build | 2 +- lib/ring/rte_ring_c11_pvt.h | 75 +++-------- lib/ring/rte_ring_elem_pvt.h | 125 ++++++++++++++++-- ..._ring_generic_pvt.h => rte_ring_x86_pvt.h} | 61 ++------- lib/ring/soring.c | 15 ++- 5 files changed, 158 insertions(+), 120 deletions(-) rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_x86_pvt.h} (60%) diff --git a/lib/ring/meson.build b/lib/ring/meson.build index 21f2c12989..b178c963b8 100644 --- a/lib/ring/meson.build +++ b/lib/ring/meson.build @@ -9,7 +9,7 @@ indirect_headers += files ( 'rte_ring_elem.h', 'rte_ring_elem_pvt.h', 'rte_ring_c11_pvt.h', - 'rte_ring_generic_pvt.h', + 'rte_ring_x86_pvt.h', 'rte_ring_hts.h', 'rte_ring_hts_elem_pvt.h', 'rte_ring_peek.h', diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index 07b6efc416..3efe011f08 100644 --- a/lib/ring/rte_ring_c11_pvt.h +++ b/lib/ring/rte_ring_c11_pvt.h @@ -15,35 +15,10 @@ * @file rte_ring_c11_pvt.h * It is not recommended to include this file directly, * include <rte_ring.h> instead. - * Contains internal helper functions for MP/SP and MC/SC ring modes. + * Contains internal helper functions for MP and MC ring modes. * For more information please refer to <rte_ring.h>. */ -/** - * @internal This function updates tail values. - */ -static __rte_always_inline void -__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) -{ - RTE_SET_USED(enqueue); - - /* - * If there are other enqueues/dequeues in progress that preceded us, - * we need to wait for them to complete - */ - if (!single) - rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); - - /* - * R0: Establishes a synchronizing edge with load-acquire of tail at A1. - * Ensures that memory effects by this thread on ring elements array - * is observed by a different thread of the other type. - */ - rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); -} - /** * @internal This is a helper function that moves the producer/consumer head * @@ -72,14 +47,11 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int is_st, unsigned int n, - enum rte_ring_queue_behavior behavior, + unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - uint32_t stail; - int success; unsigned int max = n; /* @@ -89,8 +61,7 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, * d->head. * If not, an unsafe partial order may ensue. */ - *old_head = rte_atomic_load_explicit(&d->head, - rte_memory_order_acquire); + *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire); do { /* Reset n to the initial burst count */ n = max; @@ -101,15 +72,14 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, * ring elements array is observed by the time * this thread observes its tail update. */ - stail = rte_atomic_load_explicit(&s->tail, - rte_memory_order_acquire); + uint32_t stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > s->tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *entries = (capacity + stail - *old_head); + *entries = capacity + stail - *old_head; /* check that we have enough room in ring */ if (unlikely(n > *entries)) @@ -120,25 +90,20 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, return 0; *new_head = *old_head + n; - if (is_st) { - d->head = *new_head; - success = 1; - } else - /* on failure, *old_head is updated */ - /* - * R1/A2. - * R1: Establishes a synchronizing edge with A0 of a - * different thread. - * A2: Establishes a synchronizing edge with R1 of a - * different thread to observe same value for stail - * observed by that thread on CAS failure (to retry - * with an updated *old_head). - */ - success = rte_atomic_compare_exchange_strong_explicit( - &d->head, old_head, *new_head, - rte_memory_order_release, - rte_memory_order_acquire); - } while (unlikely(success == 0)); + + /* on failure, *old_head is updated */ + /* + * R1/A2. + * R1: Establishes a synchronizing edge with A0 of a + * different thread. + * A2: Establishes a synchronizing edge with R1 of a + * different thread to observe same value for stail + * observed by that thread on CAS failure (to retry + * with an updated *old_head). + */ + } while (unlikely(!rte_atomic_compare_exchange_strong_explicit( + &d->head, old_head, *new_head, + rte_memory_order_release, rte_memory_order_acquire))); return n; } diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 6eafae121f..9d1da12a92 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -299,17 +299,108 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, cons_head & r->mask, esize, num); } -/* Between load and load. there might be cpu reorder in weak model - * (powerpc/arm). - * There are 2 choices for the users - * 1.use rmb() memory barrier - * 2.use one-direction load_acquire/store_release barrier - * It depends on performance test results. +/** + * @internal This function updates tail values. */ -#ifdef RTE_USE_C11_MEM_MODEL -#include "rte_ring_c11_pvt.h" +static __rte_always_inline void +__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, + uint32_t new_val, uint32_t single, uint32_t enqueue) +{ + RTE_SET_USED(enqueue); + + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val, + rte_memory_order_relaxed); + + /* + * R0: Establishes a synchronizing edge with load-acquire of tail at A1. + * Ensures that memory effects by this thread on ring elements array + * is observed by a different thread of the other type. + */ + rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); +} + +/** + * @internal This is a helper function that moves the producer/consumer head + * + * + * This optimized version for single threaded case. + * + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * @param n + * The number of elements we want to move head value on + * @param behavior + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible + * @param old_head + * Returns head value as it was before the move + * @param new_head + * Returns the new head value + * @param entries + * Returns the number of ring entries available BEFORE head was moved + * @return + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only + */ +static __rte_always_inline unsigned int +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, uint32_t *entries) +{ + uint32_t stail; + + /* + * A0: Establishes a synchronizing edge with R1. + * Ensure that this thread observes same values + * to stail observed by the thread that updated + * d->head. + * If not, an unsafe partial order may ensue. + */ + *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire); + + /* + * A1: Establishes a synchronizing edge with R0. + * Ensures that other thread's memory effects on + * ring elements array is observed by the time + * this thread observes its tail update. + */ + stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > s->tail). So 'entries' is always between 0 + * and capacity (which is < size). + */ + *entries = capacity + stail - *old_head; + + /* check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (n > 0) { + *new_head = *old_head + n; + d->head = *new_head; + } + + return n; +} + +/* There are two choices because GCC optimizer does poorly on atomic_compare_exchange */ +#if defined(RTE_TOOLCHAIN_GCC) && defined(RTE_ARCH_X86) +#include "rte_ring_x86_pvt.h" #else -#include "rte_ring_generic_pvt.h" +#include "rte_ring_c11_pvt.h" #endif /** @@ -341,8 +432,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { - return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity, - is_sp, n, behavior, old_head, new_head, free_entries); + if (is_sp) + return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity, + n, behavior, old_head, new_head, free_entries); + else + return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity, + n, behavior, old_head, new_head, free_entries); } /** @@ -374,8 +469,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0, - is_sc, n, behavior, old_head, new_head, entries); + if (is_sc) + return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0, + n, behavior, old_head, new_head, entries); + else + return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0, + n, behavior, old_head, new_head, entries); } /** diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_x86_pvt.h similarity index 60% rename from lib/ring/rte_ring_generic_pvt.h rename to lib/ring/rte_ring_x86_pvt.h index affd2d5ba7..c8de108bbd 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_x86_pvt.h @@ -7,39 +7,19 @@ * Used as BSD-3 Licensed with permission from Kip Macy. */ -#ifndef _RTE_RING_GENERIC_PVT_H_ -#define _RTE_RING_GENERIC_PVT_H_ +#ifndef _RTE_RING_X86_PVT_H_ +#define _RTE_RING_X86_PVT_H_ /** - * @file rte_ring_generic_pvt.h + * @file rte_ring_x86_pvt.h * It is not recommended to include this file directly, * include <rte_ring.h> instead. - * Contains internal helper functions for MP/SP and MC/SC ring modes. - * For more information please refer to <rte_ring.h>. + * + * Contains internal helper functions for MP and MC ring modes. + * It is GCC specific to workaround poor optimizer handling of C11 atomic + * compare_exchange. */ -/** - * @internal This function updates tail values. - */ -static __rte_always_inline void -__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) -{ - if (enqueue) - rte_smp_wmb(); - else - rte_smp_rmb(); - /* - * If there are other enqueues/dequeues in progress that preceded us, - * we need to wait for them to complete - */ - if (!single) - rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); - - ht->tail = new_val; -} - /** * @internal This is a helper function that moves the producer/consumer head * @@ -50,8 +30,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * function only reads tail value from it * @param capacity * Either ring capacity value (for producer), or zero (for consumer) - * @param is_st - * Indicates whether multi-thread safe path is needed or not * @param n * The number of elements we want to move head value on * @param behavior @@ -68,14 +46,13 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int is_st, unsigned int n, + unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { unsigned int max = n; - int success; do { /* Reset n to the initial burst count */ @@ -83,18 +60,13 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, *old_head = d->head; - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 - */ - rte_smp_rmb(); - /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > s->tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *entries = (capacity + s->tail - *old_head); + *entries = capacity + s->tail - *old_head; /* check that we have enough room in ring */ if (unlikely(n > *entries)) @@ -105,15 +77,10 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, return 0; *new_head = *old_head + n; - if (is_st) { - d->head = *new_head; - success = 1; - } else - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); - } while (unlikely(success == 0)); + } while (unlikely(!__sync_bool_compare_and_swap( + (uint32_t *)(uintptr_t)&d->head, + *old_head, *new_head))); return n; } -#endif /* _RTE_RING_GENERIC_PVT_H_ */ +#endif /* _RTE_RING_X86_PVT_H_ */ diff --git a/lib/ring/soring.c b/lib/ring/soring.c index 3b90521bdb..0e8bbc03c1 100644 --- a/lib/ring/soring.c +++ b/lib/ring/soring.c @@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num, switch (st) { case RTE_RING_SYNC_ST: + n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht, + r->capacity, num, behavior, head, next, free); + break; case RTE_RING_SYNC_MT: - n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, - r->capacity, st, num, behavior, head, next, free); + n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht, + r->capacity, num, behavior, head, next, free); break; case RTE_RING_SYNC_MT_RTS: n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht, @@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, switch (st) { case RTE_RING_SYNC_ST: + n = __rte_ring_headtail_move_head_st(&r->cons.ht, + &r->stage[stage].ht, 0, num, behavior, + head, next, avail); + break; case RTE_RING_SYNC_MT: - n = __rte_ring_headtail_move_head(&r->cons.ht, - &r->stage[stage].ht, 0, st, num, behavior, + n = __rte_ring_headtail_move_head_mt(&r->cons.ht, + &r->stage[stage].ht, 0, num, behavior, head, next, avail); break; case RTE_RING_SYNC_MT_RTS: -- 2.53.0

