On Sat Jun 13, 2026 at 9:48 PM EDT, Tamir Duberstein wrote:
> After consuming the last visible record, ringbuf_process_ring()
> publishes the consumer position and checks the producer position. These
> operations lack a full StoreLoad barrier. A producer can therefore
> commit a new record but read the old consumer position while the
> consumer reads the old producer position. The producer sends no
> notification and the consumer waits despite a queued record.
>
> Insert a full barrier before checking for new data, ensuring that either
> the consumer observes the producer update or the producer observes the
> consumer update and sends a notification. Apply the same handshake when
> a busy record follows records whose consumer position was published.
>
> Add an edge-triggered epoll test with a concurrent producer. Without the
> barrier, a missed notification leaves the producer dropping records from
> a full ring while the consumer times out. Document that bounded
> consumers and callbacks that terminate consumption must drain before
> waiting again.
>
> Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support")
> Reported-by: Andrew Werner <[email protected]>
> Assisted-by: Codex:gpt-5.5
> Signed-off-by: Tamir Duberstein <[email protected]>
> ---
> tools/lib/bpf/libbpf.h | 22 +++++++
> tools/lib/bpf/ringbuf.c | 14 +++-
> tools/testing/selftests/bpf/prog_tests/ringbuf.c | 84
> ++++++++++++++++++++++++
> 3 files changed, 117 insertions(+), 3 deletions(-)
>
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index 9ba6b9ad3498..a3b8f606a91d 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1439,6 +1439,10 @@ struct ring_buffer;
> struct ring;
> struct user_ring_buffer;
>
> +/* A negative return stops consumption; non-negative values continue.
> Stopping
Sashiko is right on this, let's use regular kernel style for new
comments.
> + * can leave records queued without a new readiness notification. Before
> + * waiting for readiness again, consume until no records remain.
> + */
> typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> struct ring_buffer_opts {
> @@ -1455,6 +1459,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer
> *rb, int map_fd,
> ring_buffer_sample_fn sample_cb, void *ctx);
> LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
> LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> +
> +/**
> + * @brief **ring_buffer__consume_n()** consumes up to a requested number of
> + * records from a ring buffer manager without event polling.
> + *
> + * @param rb A ring buffer manager object.
> + * @param n Maximum number of records to consume.
> + * @return The number of records consumed, or a negative error code on
> failure.
> + *
> + * Reaching the requested bound does not establish that every ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring_buffer__epoll_fd(), call ring_buffer__consume() until it
> + * returns 0.
> + */
> LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n);
> LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> @@ -1537,6 +1555,10 @@ LIBBPF_API int ring__consume(struct ring *r);
> * @param r A ringbuffer object.
> * @param n Maximum number of records to consume.
> * @return The number of records consumed, or a negative error code on
> failure.
> + *
> + * Reaching the requested bound does not establish that the ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring__map_fd(), call ring__consume() until it returns 0.
> */
> LIBBPF_API int ring__consume_n(struct ring *r, size_t n);
>
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 1c24a83f59d5..ea8909fec4e9 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -255,7 +255,7 @@ static int64_t ringbuf_process_ring(struct ring *r,
> size_t n)
> /* 64-bit to avoid overflow in case of extreme application behavior */
> int64_t cnt = 0;
> unsigned long cons_pos, prod_pos;
> - bool got_new_data;
> + bool got_new_data, needs_wakeup = false;
The extra new variable feels overly complicated. I think this is becuase
the new code is going off of the got_new_data pattern, which imo is also
unnecessary. We should just remove got_new data and just do:
while (true) {
prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
if (cons_pos == prod_pos)
break;
while (cons_pos != prod_pos) {
...
}
}
> void *sample;
>
> err = ringbuf_validate(r);
> @@ -267,14 +267,21 @@ static int64_t ringbuf_process_ring(struct ring *r,
> size_t n)
> cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE);
> do {
> got_new_data = false;
> + if (needs_wakeup) {
> + /* Ensure either this sees a new record or its producer
> sees
> + * the updated consumer position and sends a
> notification.
> + */
> + __atomic_thread_fence(__ATOMIC_SEQ_CST);
> + needs_wakeup = false;
> + }
Now on this, I think to address Sashiko's warning instead of the if
above...
> prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
> while (cons_pos != prod_pos) {
> len_ptr = r->data + (cons_pos & r->mask);
> len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE);
>
> - /* sample not committed yet, bail out for now */
> + /* Retry a busy record once after publishing prior
> records. */
> if (len & BPF_RINGBUF_BUSY_BIT)
> - goto done;
> + break;
>
> got_new_data = true;
> cons_pos += roundup_len(len);
> @@ -294,6 +301,7 @@ static int64_t ringbuf_process_ring(struct ring *r,
> size_t n)
>
> __atomic_store_n(r->consumer_pos, cons_pos,
> __ATOMIC_RELEASE);
> + needs_wakeup = true;
>
> if (cnt >= n)
> goto done;
We just add an smp_mb() here. And since AFAICT smb_mb is available here,
we can also avoid the move from smp_* to __atomic in the previous patch.
> diff --git a/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> index 9ce996bcea8c..5f0c679bf9a6 100644
> --- a/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> +++ b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
> @@ -492,6 +492,88 @@ static void ringbuf_null_cb_subtest(void)
> test_ringbuf_n_lskel__destroy(skel_n);
> }
>
> +#define N_WAKEUP_SAMPLES 20000
> +
> +struct wakeup_ctx {
> + bool stop;
> +};
> +
> +static void *wakeup_producer(void *arg)
> +{
> + struct wakeup_ctx *ctx = arg;
> +
> + while (!__atomic_load_n(&ctx->stop, __ATOMIC_RELAXED))
Why __ATOMIC_RELAXED here? Maybe just declare the variable volatile and
do regular reads?
> + syscall(__NR_getpgid);
> + return NULL;
> +}
> +
> +static void ringbuf_wakeup_subtest(void)
> +{
> + struct test_ringbuf_n_lskel *skel_n;
> + struct ring_buffer *ringbuf = NULL;
> + struct epoll_event event = {
> + .events = EPOLLIN | EPOLLET,
> + };
> + struct wakeup_ctx ctx = {};
> + pthread_t producer;
> + int epoll_fd = -1;
> + int err, total = 0;
> +
> + skel_n = test_ringbuf_n_lskel__open();
> + if (!ASSERT_OK_PTR(skel_n, "test_ringbuf_n_lskel__open"))
> + return;
> +
> + skel_n->maps.ringbuf.max_entries = getpagesize();
> + skel_n->bss->pid = getpid();
> + skel_n->bss->value = SAMPLE_VALUE;
> +
> + err = test_ringbuf_n_lskel__load(skel_n);
> + if (!ASSERT_OK(err, "test_ringbuf_n_lskel__load"))
> + goto cleanup;
> +
> + err = test_ringbuf_n_lskel__attach(skel_n);
> + if (!ASSERT_OK(err, "test_ringbuf_n_lskel__attach"))
> + goto cleanup;
> +
> + ringbuf = ring_buffer__new(skel_n->maps.ringbuf.map_fd,
> + process_noop_sample, NULL, NULL);
> + if (!ASSERT_OK_PTR(ringbuf, "ring_buffer__new"))
> + goto cleanup;
> +
> + epoll_fd = epoll_create1(EPOLL_CLOEXEC);
> + if (!ASSERT_OK_FD(epoll_fd, "epoll_create1"))
> + goto cleanup_ringbuf;
> +
> + err = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, skel_n->maps.ringbuf.map_fd,
> + &event);
> + if (!ASSERT_OK(err, "epoll_ctl"))
> + goto cleanup_epoll;
> +
> + err = pthread_create(&producer, NULL, wakeup_producer, &ctx);
> + if (!ASSERT_OK(err, "pthread_create"))
> + goto cleanup_epoll;
> +
> + while (total < N_WAKEUP_SAMPLES) {
> + err = epoll_wait(epoll_fd, &event, 1, 1000);
> + if (!ASSERT_EQ(err, 1, "epoll_wait"))
> + goto cleanup_thread;
> + while ((err = ring_buffer__consume(ringbuf)) > 0)
> + total += err;
> + if (!ASSERT_OK(err, "ring_buffer__consume"))
> + goto cleanup_thread;
Label is unnecessary, just call break;
> + }
> +
> +cleanup_thread:
> + __atomic_store_n(&ctx.stop, true, __ATOMIC_RELAXED);
> + pthread_join(producer, NULL);
> +cleanup_epoll:
> + close(epoll_fd);
> +cleanup_ringbuf:
> + ring_buffer__free(ringbuf);
> +cleanup:
> + test_ringbuf_n_lskel__destroy(skel_n);
> +}
> +
> static void ringbuf_n_subtest(void)
> {
> struct test_ringbuf_n_lskel *skel_n;
> @@ -672,6 +754,8 @@ void test_ringbuf(void)
> ringbuf_n_subtest();
> if (test__start_subtest("ringbuf_null_cb"))
> ringbuf_null_cb_subtest();
> + if (test__start_subtest("ringbuf_wakeup"))
> + ringbuf_wakeup_subtest();
> if (test__start_subtest("ringbuf_map_key"))
> ringbuf_map_key_subtest();
> if (test__start_subtest("ringbuf_write"))
pw-bot: cr