On 10/18/2018 10:14 AM, Peter Zijlstra wrote:
> On Thu, Oct 18, 2018 at 01:10:15AM +0200, Daniel Borkmann wrote:
> 
>> Wouldn't this then also allow the kernel side to use smp_store_release()
>> when it updates the head? We'd be pretty much at the model as described
>> in Documentation/core-api/circular-buffers.rst.
>>
>> Meaning, rough pseudo-code diff would look as:
>>
>> diff --git a/kernel/events/ring_buffer.c b/kernel/events/ring_buffer.c
>> index 5d3cf40..3d96275 100644
>> --- a/kernel/events/ring_buffer.c
>> +++ b/kernel/events/ring_buffer.c
>> @@ -84,8 +84,9 @@ static void perf_output_put_handle(struct 
>> perf_output_handle *handle)
>>       *
>>       * See perf_output_begin().
>>       */
>> -    smp_wmb(); /* B, matches C */
>> -    rb->user_page->data_head = head;
>> +
>> +    /* B, matches C */
>> +    smp_store_release(&rb->user_page->data_head, head);
> 
> Yes, this would be correct.
> 
> The reason we didn't do this is because smp_store_release() ends up
> being smp_mb() + WRITE_ONCE() for a fair number of platforms, even if
> they have a cheaper smp_wmb(). Most notably ARM.

Yep agree, that would be worse..

> (ARM64 OTOH would like to have smp_store_release() there I imagine;
> while x86 doesn't care either way around).
> 
> A similar concern exists for the smp_load_acquire() I proposed for the
> userspace side, ARM would have to resort to smp_mb() in that situation,
> instead of the cheaper smp_rmb().
> 
> The smp_store_release() on the userspace side will actually be of equal
> cost or cheaper, since it already has an smp_mb(). Most notably, x86 can
> avoid barrier entirely, because TSO doesn't allow the LOAD-STORE reorder
> (it only allows the STORE-LOAD reorder). And PowerPC can use LWSYNC
> instead of SYNC.

Ok, thanks a lot for your feedback, Peter! I've changed the user space
side now to the following diff (also moving to a small helper so it can
be reused by libbpf in the subsequent fix I had in the series):

 tools/arch/arm64/include/asm/barrier.h    | 70 +++++++++++++++++++++++++++++++
 tools/arch/ia64/include/asm/barrier.h     | 13 ++++++
 tools/arch/powerpc/include/asm/barrier.h  | 16 +++++++
 tools/arch/s390/include/asm/barrier.h     | 13 ++++++
 tools/arch/sparc/include/asm/barrier_64.h | 13 ++++++
 tools/arch/x86/include/asm/barrier.h      | 14 +++++++
 tools/include/asm/barrier.h               | 35 ++++++++++++++++
 tools/include/linux/ring_buffer.h         | 69 ++++++++++++++++++++++++++++++
 tools/perf/util/mmap.h                    | 15 ++-----
 9 files changed, 246 insertions(+), 12 deletions(-)
 create mode 100644 tools/include/linux/ring_buffer.h

diff --git a/tools/arch/arm64/include/asm/barrier.h 
b/tools/arch/arm64/include/asm/barrier.h
index 40bde6b..12835ea 100644
--- a/tools/arch/arm64/include/asm/barrier.h
+++ b/tools/arch/arm64/include/asm/barrier.h
@@ -14,4 +14,74 @@
 #define wmb()          asm volatile("dmb ishst" ::: "memory")
 #define rmb()          asm volatile("dmb ishld" ::: "memory")

+#define smp_store_release(p, v)                                        \
+do {                                                           \
+       union { typeof(*p) __val; char __c[1]; } __u =          \
+               { .__val = (__force typeof(*p)) (v) };          \
+                                                               \
+       switch (sizeof(*p)) {                                   \
+       case 1:                                                 \
+               asm volatile ("stlrb %w1, %0"                   \
+                               : "=Q" (*p)                     \
+                               : "r" (*(__u8 *)__u.__c)        \
+                               : "memory");                    \
+               break;                                          \
+       case 2:                                                 \
+               asm volatile ("stlrh %w1, %0"                   \
+                               : "=Q" (*p)                     \
+                               : "r" (*(__u16 *)__u.__c)       \
+                               : "memory");                    \
+               break;                                          \
+       case 4:                                                 \
+               asm volatile ("stlr %w1, %0"                    \
+                               : "=Q" (*p)                     \
+                               : "r" (*(__u32 *)__u.__c)       \
+                               : "memory");                    \
+               break;                                          \
+       case 8:                                                 \
+               asm volatile ("stlr %1, %0"                     \
+                               : "=Q" (*p)                     \
+                               : "r" (*(__u64 *)__u.__c)       \
+                               : "memory");                    \
+               break;                                          \
+       default:                                                \
+               /* Only to shut up gcc ... */                   \
+               mb();                                           \
+               break;                                          \
+       }                                                       \
+} while (0)
+
+#define smp_load_acquire(p)                                    \
+({                                                             \
+       union { typeof(*p) __val; char __c[1]; } __u;           \
+                                                               \
+       switch (sizeof(*p)) {                                   \
+       case 1:                                                 \
+               asm volatile ("ldarb %w0, %1"                   \
+                       : "=r" (*(__u8 *)__u.__c)               \
+                       : "Q" (*p) : "memory");                 \
+               break;                                          \
+       case 2:                                                 \
+               asm volatile ("ldarh %w0, %1"                   \
+                       : "=r" (*(__u16 *)__u.__c)              \
+                       : "Q" (*p) : "memory");                 \
+               break;                                          \
+       case 4:                                                 \
+               asm volatile ("ldar %w0, %1"                    \
+                       : "=r" (*(__u32 *)__u.__c)              \
+                       : "Q" (*p) : "memory");                 \
+               break;                                          \
+       case 8:                                                 \
+               asm volatile ("ldar %0, %1"                     \
+                       : "=r" (*(__u64 *)__u.__c)              \
+                       : "Q" (*p) : "memory");                 \
+               break;                                          \
+       default:                                                \
+               /* Only to shut up gcc ... */                   \
+               mb();                                           \
+               break;                                          \
+       }                                                       \
+       __u.__val;                                              \
+})
+
 #endif /* _TOOLS_LINUX_ASM_AARCH64_BARRIER_H */
diff --git a/tools/arch/ia64/include/asm/barrier.h 
b/tools/arch/ia64/include/asm/barrier.h
index d808ee0..4d471d9 100644
--- a/tools/arch/ia64/include/asm/barrier.h
+++ b/tools/arch/ia64/include/asm/barrier.h
@@ -46,4 +46,17 @@
 #define rmb()          mb()
 #define wmb()          mb()

+#define smp_store_release(p, v)                        \
+do {                                           \
+       barrier();                              \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+
+#define smp_load_acquire(p)                    \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       barrier();                              \
+       ___p1;                                  \
+})
+
 #endif /* _TOOLS_LINUX_ASM_IA64_BARRIER_H */
diff --git a/tools/arch/powerpc/include/asm/barrier.h 
b/tools/arch/powerpc/include/asm/barrier.h
index a634da0..905a2c6 100644
--- a/tools/arch/powerpc/include/asm/barrier.h
+++ b/tools/arch/powerpc/include/asm/barrier.h
@@ -27,4 +27,20 @@
 #define rmb()  __asm__ __volatile__ ("sync" : : : "memory")
 #define wmb()  __asm__ __volatile__ ("sync" : : : "memory")

+#if defined(__powerpc64__)
+#define smp_lwsync()   __asm__ __volatile__ ("lwsync" : : : "memory")
+
+#define smp_store_release(p, v)                        \
+do {                                           \
+       smp_lwsync();                           \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+
+#define smp_load_acquire(p)                    \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       smp_lwsync();                           \
+       ___p1;                                  \
+})
+#endif /* defined(__powerpc64__) */
 #endif /* _TOOLS_LINUX_ASM_POWERPC_BARRIER_H */
diff --git a/tools/arch/s390/include/asm/barrier.h 
b/tools/arch/s390/include/asm/barrier.h
index 5030c99..de362fa6 100644
--- a/tools/arch/s390/include/asm/barrier.h
+++ b/tools/arch/s390/include/asm/barrier.h
@@ -28,4 +28,17 @@
 #define rmb()                          mb()
 #define wmb()                          mb()

+#define smp_store_release(p, v)                        \
+do {                                           \
+       barrier();                              \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+
+#define smp_load_acquire(p)                    \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       barrier();                              \
+       ___p1;                                  \
+})
+
 #endif /* __TOOLS_LIB_ASM_BARRIER_H */
diff --git a/tools/arch/sparc/include/asm/barrier_64.h 
b/tools/arch/sparc/include/asm/barrier_64.h
index ba61344..cfb0fdc 100644
--- a/tools/arch/sparc/include/asm/barrier_64.h
+++ b/tools/arch/sparc/include/asm/barrier_64.h
@@ -40,4 +40,17 @@ do { __asm__ __volatile__("ba,pt     %%xcc, 1f\n\t" \
 #define rmb()  __asm__ __volatile__("":::"memory")
 #define wmb()  __asm__ __volatile__("":::"memory")

+#define smp_store_release(p, v)                        \
+do {                                           \
+       barrier();                              \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+
+#define smp_load_acquire(p)                    \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       barrier();                              \
+       ___p1;                                  \
+})
+
 #endif /* !(__TOOLS_LINUX_SPARC64_BARRIER_H) */
diff --git a/tools/arch/x86/include/asm/barrier.h 
b/tools/arch/x86/include/asm/barrier.h
index 8774dee..5891986 100644
--- a/tools/arch/x86/include/asm/barrier.h
+++ b/tools/arch/x86/include/asm/barrier.h
@@ -26,4 +26,18 @@
 #define wmb()  asm volatile("sfence" ::: "memory")
 #endif

+#if defined(__x86_64__)
+#define smp_store_release(p, v)                        \
+do {                                           \
+       barrier();                              \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+
+#define smp_load_acquire(p)                    \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       barrier();                              \
+       ___p1;                                  \
+})
+#endif /* defined(__x86_64__) */
 #endif /* _TOOLS_LINUX_ASM_X86_BARRIER_H */
diff --git a/tools/include/asm/barrier.h b/tools/include/asm/barrier.h
index 391d942..8d378c5 100644
--- a/tools/include/asm/barrier.h
+++ b/tools/include/asm/barrier.h
@@ -1,4 +1,5 @@
 /* SPDX-License-Identifier: GPL-2.0 */
+#include <linux/compiler.h>
 #if defined(__i386__) || defined(__x86_64__)
 #include "../../arch/x86/include/asm/barrier.h"
 #elif defined(__arm__)
@@ -26,3 +27,37 @@
 #else
 #include <asm-generic/barrier.h>
 #endif
+
+/*
+ * Generic fallback smp_*() definitions for archs that haven't
+ * been updated yet.
+ */
+
+#ifndef smp_rmb
+# define smp_rmb()     rmb()
+#endif
+
+#ifndef smp_wmb
+# define smp_wmb()     wmb()
+#endif
+
+#ifndef smp_mb
+# define smp_mb()      mb()
+#endif
+
+#ifndef smp_store_release
+# define smp_store_release(p, v)               \
+do {                                           \
+       smp_mb();                               \
+       WRITE_ONCE(*p, v);                      \
+} while (0)
+#endif
+
+#ifndef smp_load_acquire
+# define smp_load_acquire(p)                   \
+({                                             \
+       typeof(*p) ___p1 = READ_ONCE(*p);       \
+       smp_mb();                               \
+       ___p1;                                  \
+})
+#endif
diff --git a/tools/include/linux/ring_buffer.h 
b/tools/include/linux/ring_buffer.h
new file mode 100644
index 0000000..48200e0
--- /dev/null
+++ b/tools/include/linux/ring_buffer.h
@@ -0,0 +1,69 @@
+#ifndef _TOOLS_LINUX_RING_BUFFER_H_
+#define _TOOLS_LINUX_RING_BUFFER_H_
+
+#include <linux/compiler.h>
+#include <asm/barrier.h>
+
+/*
+ * Below barriers pair as follows (kernel/events/ring_buffer.c):
+ *
+ * Since the mmap() consumer (userspace) can run on a different CPU:
+ *
+ *   kernel                             user
+ *
+ *   if (LOAD ->data_tail) {            LOAD ->data_head
+ *                      (A)             smp_rmb()       (C)
+ *      STORE $data                     LOAD $data
+ *      smp_wmb()       (B)             smp_mb()        (D)
+ *      STORE ->data_head               STORE ->data_tail
+ *   }
+ *
+ * Where A pairs with D, and B pairs with C.
+ *
+ * In our case A is a control dependency that separates the load
+ * of the ->data_tail and the stores of $data. In case ->data_tail
+ * indicates there is no room in the buffer to store $data we do not.
+ *
+ * D needs to be a full barrier since it separates the data READ
+ * from the tail WRITE.
+ *
+ * For B a WMB is sufficient since it separates two WRITEs, and for
+ * C an RMB is sufficient since it separates two READs.
+ */
+
+/*
+ * Note, instead of B, C, D we could also use smp_store_release()
+ * in B and D as well as smp_load_acquire() in C. However, this
+ * optimization makes sense not for all architectures since it
+ * would resolve into READ_ONCE() + smp_mb() pair for smp_load_acquire()
+ * and smp_mb() + WRITE_ONCE() pair for smp_store_release(), thus
+ * for those smp_wmb() in B and smp_rmb() in C would still be less
+ * expensive. For the case of D this has either the same cost or
+ * is less expensive. For example, due to TSO (total store order),
+ * x86 can avoid the CPU barrier entirely.
+ */
+
+static inline u64 ring_buffer_read_head(struct perf_event_mmap_page *base)
+{
+/*
+ * Architectures where smp_load_acquire() does not fallback to
+ * READ_ONCE() + smp_mb() pair.
+ */
+#if defined(__x86_64__) || defined(__aarch64__) || defined(__powerpc64__) || \
+    defined(__ia64__) || defined(__sparc__) && defined(__arch64__)
+       return smp_load_acquire(&base->data_head);
+#else
+       u64 head = READ_ONCE(base->data_head);
+
+       smp_rmb();
+       return head;
+#endif
+}
+
+static inline void ring_buffer_write_tail(struct perf_event_mmap_page *base,
+                                         u64 tail)
+{
+       smp_store_release(&base->data_tail, tail);
+}
+
+#endif /* _TOOLS_LINUX_RING_BUFFER_H_ */
diff --git a/tools/perf/util/mmap.h b/tools/perf/util/mmap.h
index 05a6d47..8f6531f 100644
--- a/tools/perf/util/mmap.h
+++ b/tools/perf/util/mmap.h
@@ -4,7 +4,7 @@
 #include <linux/compiler.h>
 #include <linux/refcount.h>
 #include <linux/types.h>
-#include <asm/barrier.h>
+#include <linux/ring_buffer.h>
 #include <stdbool.h>
 #include "auxtrace.h"
 #include "event.h"
@@ -71,21 +71,12 @@ void perf_mmap__consume(struct perf_mmap *map);

 static inline u64 perf_mmap__read_head(struct perf_mmap *mm)
 {
-       struct perf_event_mmap_page *pc = mm->base;
-       u64 head = READ_ONCE(pc->data_head);
-       rmb();
-       return head;
+       return ring_buffer_read_head(mm->base);
 }

 static inline void perf_mmap__write_tail(struct perf_mmap *md, u64 tail)
 {
-       struct perf_event_mmap_page *pc = md->base;
-
-       /*
-        * ensure all reads are done before we write the tail out.
-        */
-       mb();
-       pc->data_tail = tail;
+       ring_buffer_write_tail(md->base, tail);
 }

 union perf_event *perf_mmap__read_forward(struct perf_mmap *map);
-- 
2.9.5

Reply via email to