The receive callback was not safe with multiple queues.
If one receive queue callback decides to take a sample it
needs to add that sample and do atomic update to the previous
TSC sample value. Add a new lock for that.

Optimize the check for when to take sample so that
it only needs to lock when likely to need a sample.

Also, add code to handle TSC wraparound in comparison.
Perhaps this should move to rte_cycles.h?

Bugzilla ID: 1723
Signed-off-by: Stephen Hemminger <step...@networkplumber.org>
Fixes: 5cd3cac9ed22 ("latency: added new library for latency stats")
Cc: sta...@dpdk.org
---
 lib/latencystats/rte_latencystats.c | 55 ++++++++++++++++++-----------
 1 file changed, 35 insertions(+), 20 deletions(-)

diff --git a/lib/latencystats/rte_latencystats.c 
b/lib/latencystats/rte_latencystats.c
index 6873a44a92..72a58d78d1 100644
--- a/lib/latencystats/rte_latencystats.c
+++ b/lib/latencystats/rte_latencystats.c
@@ -22,6 +22,7 @@
 #include <rte_metrics.h>
 #include <rte_spinlock.h>
 #include <rte_string_fns.h>
+#include <rte_stdatomic.h>
 
 #include "rte_latencystats.h"
 
@@ -45,11 +46,20 @@ timestamp_dynfield(struct rte_mbuf *mbuf)
                        timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
 }
 
+/* Compare two 64 bit timer counter but deal with wraparound correctly. */
+static inline bool tsc_after(uint64_t t0, uint64_t t1)
+{
+       return (int64_t)(t1 - t0) < 0;
+}
+
+#define tsc_before(a, b) tsc_after(b, a)
+
 static const char *MZ_RTE_LATENCY_STATS = "rte_latencystats";
 static int latency_stats_index;
+
+static rte_spinlock_t sample_lock = RTE_SPINLOCK_INITIALIZER;
 static uint64_t samp_intvl;
-static uint64_t timer_tsc;
-static uint64_t prev_tsc;
+static RTE_ATOMIC(uint64_t) next_tsc;
 
 #define LATENCY_AVG_SCALE     4
 #define LATENCY_JITTER_SCALE 16
@@ -147,25 +157,29 @@ add_time_stamps(uint16_t pid __rte_unused,
                void *user_cb __rte_unused)
 {
        unsigned int i;
-       uint64_t diff_tsc, now;
-
-       /*
-        * For every sample interval,
-        * time stamp is marked on one received packet.
-        */
-       now = rte_rdtsc();
-       for (i = 0; i < nb_pkts; i++) {
-               diff_tsc = now - prev_tsc;
-               timer_tsc += diff_tsc;
-
-               if ((pkts[i]->ol_flags & timestamp_dynflag) == 0
-                               && (timer_tsc >= samp_intvl)) {
-                       *timestamp_dynfield(pkts[i]) = now;
-                       pkts[i]->ol_flags |= timestamp_dynflag;
-                       timer_tsc = 0;
+       uint64_t now = rte_rdtsc();
+
+       /* Check without locking */
+       if (likely(tsc_before(now, rte_atomic_load_explicit(&next_tsc,
+                                                           
rte_memory_order_relaxed))))
+               return nb_pkts;
+
+       /* Try and get sample, skip if sample is being done by other core. */
+       if (likely(rte_spinlock_trylock(&sample_lock))) {
+               for (i = 0; i < nb_pkts; i++) {
+                       struct rte_mbuf *m = pkts[i];
+
+                       /* skip if already timestamped */
+                       if (unlikely(m->ol_flags & timestamp_dynflag))
+                               continue;
+
+                       m->ol_flags |= timestamp_dynflag;
+                       *timestamp_dynfield(m) = now;
+                       rte_atomic_store_explicit(&next_tsc, now + samp_intvl,
+                                                 rte_memory_order_relaxed);
+                       break;
                }
-               prev_tsc = now;
-               now = rte_rdtsc();
+               rte_spinlock_unlock(&sample_lock);
        }
 
        return nb_pkts;
@@ -270,6 +284,7 @@ rte_latencystats_init(uint64_t app_samp_intvl,
        glob_stats = mz->addr;
        rte_spinlock_init(&glob_stats->lock);
        samp_intvl = (uint64_t)(app_samp_intvl * cycles_per_ns);
+       next_tsc = rte_rdtsc();
 
        /** Register latency stats with stats library */
        for (i = 0; i < NUM_LATENCY_STATS; i++)
-- 
2.47.2

Reply via email to