While finalize() is MT-safe and can be called from multiple places:
from 'acquire()' for next stage or even from consumer's 'dequeue(),
doing so usually creates extra un-necessary contention and might
slow-down ring operations, especially for the cases when we have
multiple threads doing acquire/release for some stage.
Things become worse when we have single producer/consumer and multiple
workers doing acquire/release for the same ring.
So instead of calling finalize() from different places, just make
release() for given stage to always try to perform it.

Accorging to the soring_stress_autotest, for multiple workers (8+)
it reduces number of cycles spent by 1.5x-2.0x factor.
For l3fwd-like workload it improves things by ~20%.
For small number of workers didn't observe any serious change.
As another benefit - it allows to simplify the code.

Signed-off-by: Konstantin Ananyev <[email protected]>
---
 lib/ring/soring.c | 52 ++++++++---------------------------------------
 1 file changed, 9 insertions(+), 43 deletions(-)

diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 3b90521bdb..fc7fc55a21 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -50,10 +50,8 @@
  * from current stage tail up to its head, check state[] and move stage tail
  * through elements that already are in SORING_ST_FINISH state.
  * Along with that, corresponding state[] values are reset to zero.
- * Note that 'finalize()' for given stage can be done from multiple places:
- * 'release()' for that stage or from 'acquire()' for next stage
- * even from consumer's 'dequeue()' - in case given stage is the last one.
- * So 'finalize()' has to be MT-safe and inside it we have to
+ * Note that 'finalize()' for given stage can be called from multiple threads
+ * in parallel, so 'finalize()' has to be MT-safe and inside it we have to
  * guarantee that only one thread will update state[] and stage's tail values.
  */
 
@@ -284,7 +282,7 @@ soring_dequeue(struct rte_soring *r, void *objs, void *meta,
        uint32_t *available)
 {
        enum rte_ring_sync_type st;
-       uint32_t entries, cons_head, cons_next, n, ns, reqn;
+       uint32_t entries, cons_head, cons_next, n, ns;
 
        RTE_ASSERT(r != NULL && r->nb_stage > 0);
        RTE_ASSERT(meta == NULL || r->meta != NULL);
@@ -293,22 +291,8 @@ soring_dequeue(struct rte_soring *r, void *objs, void 
*meta,
        st = r->cons.ht.sync_type;
 
        /* try to grab exactly @num elems first */
-       n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
+       n = __rte_soring_move_cons_head(r, ns, num, behavior, st,
                        &cons_head, &cons_next, &entries);
-       if (n == 0) {
-               /* try to finalize some elems from previous stage */
-               n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns,
-                       r->state, r->mask, 2 * num);
-               entries += n;
-
-               /* repeat attempt to grab elems */
-               reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
-               if (entries >= reqn)
-                       n = __rte_soring_move_cons_head(r, ns, num, behavior,
-                               st, &cons_head, &cons_next, &entries);
-               else
-                       n = 0;
-       }
 
        /* we have some elems to consume */
        if (n != 0) {
@@ -382,7 +366,7 @@ soring_acquire(struct rte_soring *r, void *objs, void *meta,
        uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
        uint32_t *ftoken, uint32_t *available)
 {
-       uint32_t avail, head, idx, n, next, reqn;
+       uint32_t avail, head, idx, n, next;
        struct soring_stage *pstg;
        struct soring_stage_headtail *cons;
 
@@ -399,22 +383,7 @@ soring_acquire(struct rte_soring *r, void *objs, void 
*meta,
 
                /* try to grab exactly @num elems */
                n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
-                       RTE_RING_QUEUE_FIXED, &head, &next, &avail);
-               if (n == 0) {
-                       /* try to finalize some elems from previous stage */
-                       n = __rte_soring_stage_finalize(&pstg->sht, stage - 1,
-                               r->state, r->mask, 2 * num);
-                       avail += n;
-
-                       /* repeat attempt to grab elems */
-                       reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
-                       if (avail >= reqn)
-                               n = __rte_soring_stage_move_head(cons,
-                                       &pstg->ht, 0, num, behavior, &head,
-                                       &next, &avail);
-                       else
-                               n = 0;
-               }
+                       behavior, &head, &next, &avail);
        }
 
        if (n != 0) {
@@ -442,7 +411,7 @@ static __rte_always_inline void
 soring_release(struct rte_soring *r, const void *objs,
        const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
 {
-       uint32_t idx, pos, tail;
+       uint32_t idx, pos;
        struct soring_stage *stg;
        union soring_state st;
 
@@ -479,11 +448,8 @@ soring_release(struct rte_soring *r, const void *objs,
        rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
                        rte_memory_order_relaxed);
 
-       /* try to do finalize(), if appropriate */
-       tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
-                       rte_memory_order_relaxed);
-       if (tail == pos)
-               __rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask,
+       /* try to do finalize(), if possible */
+       __rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask,
                                r->capacity);
 }
 
-- 
2.51.0

Reply via email to