From: Gaetan Rivet <[email protected]>

Add a way to schedule executions with the RCU using memory embedded
within the object being scheduled, if applicable.

This way, freeing a high volume of objects does not require many small
allocations, potentially increasing heap fragmentation and memory
pressure.

Signed-off-by: Gaetan Rivet <[email protected]>
Co-authored-by: Eli Britstein <[email protected]>
Signed-off-by: Eli Britstein <[email protected]>
---
 lib/guarded-list.c |  10 ++++
 lib/guarded-list.h |   2 +
 lib/ovs-rcu.c      | 110 ++++++++++++++++++++++---------------
 lib/ovs-rcu.h      |  39 ++++++++++++++
 tests/test-rcu.c   | 131 +++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 249 insertions(+), 43 deletions(-)

diff --git a/lib/guarded-list.c b/lib/guarded-list.c
index 2186d074e..bb77fb55f 100644
--- a/lib/guarded-list.c
+++ b/lib/guarded-list.c
@@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list,
     return retval;
 }
 
+void
+guarded_list_push_back_all(struct guarded_list *list,
+                           struct ovs_list *nodes, size_t n)
+{
+    ovs_mutex_lock(&list->mutex);
+    ovs_list_push_back_all(&list->list, nodes);
+    list->n += n;
+    ovs_mutex_unlock(&list->mutex);
+}
+
 struct ovs_list *
 guarded_list_pop_front(struct guarded_list *list)
 {
diff --git a/lib/guarded-list.h b/lib/guarded-list.h
index 80ce22c12..b575dc425 100644
--- a/lib/guarded-list.h
+++ b/lib/guarded-list.h
@@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *);
 
 size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *,
                               size_t max);
+void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *,
+                                size_t n);
 struct ovs_list *guarded_list_pop_front(struct guarded_list *);
 size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *);
 
diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
index 49afcc55c..54e6c469d 100644
--- a/lib/ovs-rcu.c
+++ b/lib/ovs-rcu.c
@@ -38,7 +38,7 @@ struct ovsrcu_cb {
 };
 
 struct ovsrcu_cbset {
-    struct ovs_list list_node;
+    struct ovsrcu_node rcu_node;
     struct ovsrcu_cb *cbs;
     size_t n_allocated;
     int n_cbs;
@@ -49,6 +49,8 @@ struct ovsrcu_perthread {
 
     uint64_t seqno;
     struct ovsrcu_cbset *cbset;
+    struct ovs_list pending;    /* Thread-local list of ovsrcu_node. */
+    size_t n_pending;
     char name[16];              /* This thread's name. */
 };
 
@@ -58,15 +60,15 @@ static pthread_key_t perthread_key;
 static struct ovs_list ovsrcu_threads;
 static struct ovs_mutex ovsrcu_threads_mutex;
 
-static struct guarded_list flushed_cbsets;
-static struct seq *flushed_cbsets_seq;
+static struct guarded_list flushed_nodes;
+static struct seq *flushed_nodes_seq;
 
 static struct latch postpone_exit;
 static struct ovs_barrier postpone_barrier;
 
 static void ovsrcu_init_module(void);
-static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
-static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
+static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool);
+static void ovsrcu_flush_nodes(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
 static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
@@ -85,6 +87,8 @@ ovsrcu_perthread_get(void)
         perthread = xmalloc(sizeof *perthread);
         perthread->seqno = seq_read(global_seqno);
         perthread->cbset = NULL;
+        ovs_list_init(&perthread->pending);
+        perthread->n_pending = 0;
         ovs_strlcpy(perthread->name, name[0] ? name : "main",
                     sizeof perthread->name);
 
@@ -153,9 +157,7 @@ ovsrcu_quiesce(void)
 
     perthread = ovsrcu_perthread_get();
     perthread->seqno = seq_read(global_seqno);
-    if (perthread->cbset) {
-        ovsrcu_flush_cbset(perthread);
-    }
+    ovsrcu_flush_nodes(perthread);
     seq_change(global_seqno);
 
     ovsrcu_quiesced();
@@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void)
     perthread = ovsrcu_perthread_get();
     if (!seq_try_lock()) {
         perthread->seqno = seq_read(global_seqno);
-        if (perthread->cbset) {
-            ovsrcu_flush_cbset__(perthread, true);
-        }
+        ovsrcu_flush_nodes__(perthread, true);
         seq_change_protected(global_seqno);
         seq_unlock();
         ovsrcu_quiesced();
@@ -264,10 +264,10 @@ ovsrcu_exit(void)
     /* Repeatedly:
      *
      *    - Wait for a grace period.  One important side effect is to push the
-     *      running thread's cbset into 'flushed_cbsets' so that the next call
+     *      running thread's nodes into 'flushed_nodes' so that the next call
      *      has something to call.
      *
-     *    - Call all the callbacks in 'flushed_cbsets'.  If there aren't any,
+     *    - Call all the callbacks in 'flushed_nodes'.  If there aren't any,
      *      we're done, otherwise the callbacks themselves might have requested
      *      more deferred callbacks so we go around again.
      *
@@ -282,6 +282,32 @@ ovsrcu_exit(void)
     }
 }
 
+static void
+ovsrcu_run_cbset(void *aux)
+{
+    struct ovsrcu_cbset *cbset = aux;
+    struct ovsrcu_cb *cb;
+
+    for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
+        cb->function(cb->aux);
+    }
+
+    free(cbset->cbs);
+    free(cbset);
+}
+
+void
+ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
+                           struct ovsrcu_node *rcu_node)
+{
+    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
+
+    rcu_node->cb = function;
+    rcu_node->aux = aux;
+    ovs_list_push_back(&perthread->pending, &rcu_node->list_node);
+    perthread->n_pending++;
+}
+
 /* Registers 'function' to be called, passing 'aux' as argument, after the
  * next grace period.
  *
@@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
         cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs);
         cbset->n_allocated = MIN_CBS;
         cbset->n_cbs = 0;
+        ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node);
     }
 
     if (cbset->n_cbs == cbset->n_allocated) {
@@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
 static bool OVS_NO_SANITIZE_FUNCTION
 ovsrcu_call_postponed(void)
 {
-    struct ovsrcu_cbset *cbset;
-    struct ovs_list cbsets;
+    struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes);
+    struct ovsrcu_node *node;
 
-    guarded_list_pop_all(&flushed_cbsets, &cbsets);
-    if (ovs_list_is_empty(&cbsets)) {
+    guarded_list_pop_all(&flushed_nodes, &nodes);
+    if (ovs_list_is_empty(&nodes)) {
         return false;
     }
 
     ovsrcu_synchronize();
 
-    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
-        struct ovsrcu_cb *cb;
-
-        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
-            cb->function(cb->aux);
-        }
-        free(cbset->cbs);
-        free(cbset);
+    LIST_FOR_EACH_POP (node, list_node, &nodes) {
+        node->cb(node->aux);
     }
 
     return true;
@@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
     pthread_detach(pthread_self());
 
     while (!latch_is_set(&postpone_exit)) {
-        uint64_t seqno = seq_read(flushed_cbsets_seq);
+        uint64_t cb_seqno = seq_read(flushed_nodes_seq);
         if (!ovsrcu_call_postponed()) {
-            seq_wait(flushed_cbsets_seq, seqno);
+            seq_wait(flushed_nodes_seq, cb_seqno);
             latch_wait(&postpone_exit);
             poll_block();
         }
@@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
 }
 
 static void
-ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
+ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected)
 {
-    struct ovsrcu_cbset *cbset = perthread->cbset;
+    if (ovs_list_is_empty(&perthread->pending)) {
+        return;
+    }
 
-    if (cbset) {
-        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
-        perthread->cbset = NULL;
+    perthread->cbset = NULL;
+    guarded_list_push_back_all(&flushed_nodes, &perthread->pending,
+                               perthread->n_pending);
+    ovs_list_init(&perthread->pending);
+    perthread->n_pending = 0;
 
-        if (protected) {
-            seq_change_protected(flushed_cbsets_seq);
-        } else {
-            seq_change(flushed_cbsets_seq);
-        }
+    if (protected) {
+        seq_change_protected(flushed_nodes_seq);
+    } else {
+        seq_change(flushed_nodes_seq);
     }
 }
 
 static void
-ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread)
 {
-    ovsrcu_flush_cbset__(perthread, false);
+    ovsrcu_flush_nodes__(perthread, false);
 }
 
 static void
 ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
 {
-    if (perthread->cbset) {
-        ovsrcu_flush_cbset(perthread);
+    if (!ovs_list_is_empty(&perthread->pending)) {
+        ovsrcu_flush_nodes(perthread);
     }
 
     ovs_mutex_lock(&ovsrcu_threads_mutex);
@@ -438,8 +462,8 @@ ovsrcu_init_module(void)
         ovs_list_init(&ovsrcu_threads);
         ovs_mutex_init(&ovsrcu_threads_mutex);
 
-        guarded_list_init(&flushed_cbsets);
-        flushed_cbsets_seq = seq_create();
+        guarded_list_init(&flushed_nodes);
+        flushed_nodes_seq = seq_create();
 
         ovsthread_once_done(&once);
     }
diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
index a1c15c126..efd43a1a2 100644
--- a/lib/ovs-rcu.h
+++ b/lib/ovs-rcu.h
@@ -125,6 +125,22 @@
  *         ovs_mutex_unlock(&mutex);
  *     }
  *
+ * As an alternative to ovsrcu_postpone(), the same deferred execution can be
+ * achieved using ovsrcu_postpone_embedded():
+ *
+ *      struct deferrable {
+ *          struct ovsrcu_node rcu_node;
+ *      };
+ *
+ *      void
+ *      deferred_free(struct deferrable *d)
+ *      {
+ *          ovsrcu_postpone_embedded(free, d, rcu_node);
+ *      }
+ *
+ * Using embedded fields can be preferred sometimes to avoid the small
+ * allocations done in ovsrcu_postpone().
+ *
  * In some rare cases an object may not be addressable with a pointer, but only
  * through an array index (e.g. because it's provided by another library).  It
  * is still possible to have RCU semantics by using the ovsrcu_index type.
@@ -173,6 +189,8 @@
 #include "compiler.h"
 #include "ovs-atomic.h"
 
+#include "openvswitch/list.h"
+
 #if __GNUC__
 #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }
 #define OVSRCU_INITIALIZER(VALUE) { VALUE }
@@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void 
*aux);
      (void) sizeof(*(ARG)),                                     \
      ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
 
+struct ovsrcu_node {
+    struct ovs_list list_node;
+    void (*cb)(void *aux);
+    void *aux;
+};
+
+/* Calls FUNCTION passing ARG as its pointer-type argument, which
+ * contains an 'ovsrcu_node' as a field named MEMBER. The function
+ * is called following the next grace period.  See 'Usage' above for an
+ * example.
+ */
+void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
+                                struct ovsrcu_node *node);
+#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER)             \
+    (/* Verify that ARG is appropriate for FUNCTION. */             \
+     (void) sizeof((FUNCTION)(ARG), 1),                             \
+     /* Verify that ARG is a pointer type. */                       \
+     (void) sizeof(*(ARG)),                                         \
+     ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG,  \
+                                &(ARG)->MEMBER))
+
 /* An array index protected by RCU semantics.  This is an easier alternative to
  * an RCU protected pointer to a malloc'd int. */
 typedef struct { atomic_int v; } ovsrcu_index;
diff --git a/tests/test-rcu.c b/tests/test-rcu.c
index bb17092bf..26150e7d9 100644
--- a/tests/test-rcu.c
+++ b/tests/test-rcu.c
@@ -17,11 +17,16 @@
 #include <config.h>
 #undef NDEBUG
 #include "fatal-signal.h"
+#include "ovs-atomic.h"
 #include "ovs-rcu.h"
 #include "ovs-thread.h"
 #include "ovstest.h"
+#include "seq.h"
+#include "timeval.h"
 #include "util.h"
 
+#include "openvswitch/poll-loop.h"
+
 static void *
 quiescer_main(void *aux OVS_UNUSED)
 {
@@ -67,10 +72,136 @@ test_rcu_barrier(void)
     ovs_assert(count == 10);
 }
 
+struct element {
+    struct ovsrcu_node rcu_node;
+    struct seq *trigger;
+    atomic_bool wait;
+};
+
+static void
+trigger_cb(void *e_)
+{
+    struct element *e = (struct element *) e_;
+
+    seq_change(e->trigger);
+}
+
+static void *
+wait_main(void *aux)
+{
+    struct element *e = aux;
+
+    for (;;) {
+        bool wait;
+
+        atomic_read(&e->wait, &wait);
+        if (!wait) {
+            break;
+        }
+    }
+
+    seq_wait(e->trigger, seq_read(e->trigger));
+    poll_block();
+
+    return NULL;
+}
+
+static void
+test_rcu_postpone_embedded(bool multithread)
+{
+    long long int timeout;
+    pthread_t waiter;
+    struct element e;
+    uint64_t seqno;
+
+    atomic_init(&e.wait, true);
+
+    if (multithread) {
+        waiter = ovs_thread_create("waiter", wait_main, &e);
+    }
+
+    e.trigger = seq_create();
+    seqno = seq_read(e.trigger);
+
+    ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node);
+
+    /* Check that GC holds out until all threads are quiescent. */
+    timeout = time_msec();
+    if (multithread) {
+        timeout += 200;
+    }
+    while (time_msec() <= timeout) {
+        ovs_assert(seq_read(e.trigger) == seqno);
+    }
+
+    atomic_store(&e.wait, false);
+
+    seq_wait(e.trigger, seqno);
+    poll_timer_wait_until(time_msec() + 200);
+    poll_block();
+
+    /* Verify that GC executed. */
+    ovs_assert(seq_read(e.trigger) != seqno);
+    seq_destroy(e.trigger);
+
+    if (multithread) {
+        xpthread_join(waiter, NULL);
+    }
+}
+
+#define N_ORDER_CBS 5
+
+struct order_element {
+    struct ovsrcu_node rcu_node;
+    int id;
+    int *log;
+    int *log_idx;
+};
+
+static void
+order_cb(void *aux)
+{
+    struct order_element *e = aux;
+    e->log[(*e->log_idx)++] = e->id;
+}
+
+static void
+test_rcu_ordering(void)
+{
+    struct order_element elems[N_ORDER_CBS];
+    int log[N_ORDER_CBS];
+    int log_idx = 0;
+
+    for (int i = 0; i < N_ORDER_CBS; i++) {
+        elems[i].id = i;
+        elems[i].log = log;
+        elems[i].log_idx = &log_idx;
+        ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node);
+    }
+
+    ovsrcu_barrier();
+
+    ovs_assert(log_idx == N_ORDER_CBS);
+    for (int i = 0; i < N_ORDER_CBS; i++) {
+        if (log[i] != i) {
+            ovs_abort(0, "RCU embedded callback ordering violated: "
+                      "expected cb %d at position %d, got %d",
+                      i, i, log[i]);
+        }
+    }
+}
+
 static void
 test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) {
+    const bool multithread = true;
+
+    /* Execute single-threaded check before spawning additional threads. */
+    test_rcu_postpone_embedded(!multithread);
+    test_rcu_postpone_embedded(multithread);
+
     test_rcu_quiesce();
     test_rcu_barrier();
+    test_rcu_ordering();
 }
 
 OVSTEST_REGISTER("test-rcu", test_rcu);
-- 
2.34.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to