The receive callback was not safe with multiple queues. If one receive queue callback decides to take a sample it needs to add that sample and do atomic update to the previous TSC sample value. Add a new lock for that.
Optimize the check for when to take sample so that it only needs to lock when likely to need a sample. Also, add code to handle TSC wraparound in comparison. Perhaps this should move to rte_cycles.h? Signed-off-by: Stephen Hemminger <step...@networkplumber.org> Fixes: 5cd3cac9ed22 ("latency: added new library for latency stats") Cc: sta...@dpdk.org --- lib/latencystats/rte_latencystats.c | 51 ++++++++++++++++++----------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/lib/latencystats/rte_latencystats.c b/lib/latencystats/rte_latencystats.c index 6873a44a92..2b994656fb 100644 --- a/lib/latencystats/rte_latencystats.c +++ b/lib/latencystats/rte_latencystats.c @@ -22,6 +22,7 @@ #include <rte_metrics.h> #include <rte_spinlock.h> #include <rte_string_fns.h> +#include <rte_stdatomic.h> #include "rte_latencystats.h" @@ -45,11 +46,20 @@ timestamp_dynfield(struct rte_mbuf *mbuf) timestamp_dynfield_offset, rte_mbuf_timestamp_t *); } +/* Compare two 64 bit timer counter but deal with wraparound correctly. */ +static inline bool tsc_after(uint64_t t0, uint64_t t1) +{ + return (int64_t)(t1 - t0) < 0; +} + +#define tsc_before(a, b) tsc_after(b, a) + static const char *MZ_RTE_LATENCY_STATS = "rte_latencystats"; static int latency_stats_index; + +static rte_spinlock_t sample_lock = RTE_SPINLOCK_INITIALIZER; static uint64_t samp_intvl; -static uint64_t timer_tsc; -static uint64_t prev_tsc; +static RTE_ATOMIC(uint64_t) next_tsc; #define LATENCY_AVG_SCALE 4 #define LATENCY_JITTER_SCALE 16 @@ -147,25 +157,27 @@ add_time_stamps(uint16_t pid __rte_unused, void *user_cb __rte_unused) { unsigned int i; - uint64_t diff_tsc, now; + uint64_t now = rte_rdtsc(); - /* - * For every sample interval, - * time stamp is marked on one received packet. - */ - now = rte_rdtsc(); - for (i = 0; i < nb_pkts; i++) { - diff_tsc = now - prev_tsc; - timer_tsc += diff_tsc; - - if ((pkts[i]->ol_flags & timestamp_dynflag) == 0 - && (timer_tsc >= samp_intvl)) { - *timestamp_dynfield(pkts[i]) = now; - pkts[i]->ol_flags |= timestamp_dynflag; - timer_tsc = 0; + /* Check without locking */ + if (likely(tsc_before(now, rte_atomic_load_explicit(&next_tsc, rte_memory_order_relaxed)))) + return nb_pkts; + + /* Try and get sample, skip if sample is being done by other core. */ + if (likely(rte_spinlock_trylock(&sample_lock))) { + for (i = 0; i < nb_pkts; i++) { + struct rte_mbuf *m = pkts[i]; + + /* skip if already timestamped */ + if (unlikely(m->ol_flags & timestamp_dynflag)) + continue; + + m->ol_flags |= timestamp_dynflag; + *timestamp_dynfield(m) = now; + rte_atomic_store_explicit(&next_tsc, now + samp_intvl, rte_memory_order_relaxed); + break; } - prev_tsc = now; - now = rte_rdtsc(); + rte_spinlock_unlock(&sample_lock); } return nb_pkts; @@ -270,6 +282,7 @@ rte_latencystats_init(uint64_t app_samp_intvl, glob_stats = mz->addr; rte_spinlock_init(&glob_stats->lock); samp_intvl = (uint64_t)(app_samp_intvl * cycles_per_ns); + next_tsc = rte_rdtsc(); /** Register latency stats with stats library */ for (i = 0; i < NUM_LATENCY_STATS; i++) -- 2.47.2