From: Gaetan Rivet <[email protected]> Add a way to schedule executions with the RCU using memory embedded within the object being scheduled, if applicable.
This way, freeing a high volume of objects does not require many small allocations, potentially increasing heap fragmentation and memory pressure. Signed-off-by: Gaetan Rivet <[email protected]> Co-authored-by: Eli Britstein <[email protected]> Signed-off-by: Eli Britstein <[email protected]> --- lib/guarded-list.c | 10 ++++ lib/guarded-list.h | 2 + lib/ovs-rcu.c | 110 ++++++++++++++++++++++--------------- lib/ovs-rcu.h | 39 ++++++++++++++ tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+), 43 deletions(-) diff --git a/lib/guarded-list.c b/lib/guarded-list.c index 2186d074e..bb77fb55f 100644 --- a/lib/guarded-list.c +++ b/lib/guarded-list.c @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, return retval; } +void +guarded_list_push_back_all(struct guarded_list *list, + struct ovs_list *nodes, size_t n) +{ + ovs_mutex_lock(&list->mutex); + ovs_list_push_back_all(&list->list, nodes); + list->n += n; + ovs_mutex_unlock(&list->mutex); +} + struct ovs_list * guarded_list_pop_front(struct guarded_list *list) { diff --git a/lib/guarded-list.h b/lib/guarded-list.h index 80ce22c12..b575dc425 100644 --- a/lib/guarded-list.h +++ b/lib/guarded-list.h @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, size_t max); +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, + size_t n); struct ovs_list *guarded_list_pop_front(struct guarded_list *); size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c index 49afcc55c..54e6c469d 100644 --- a/lib/ovs-rcu.c +++ b/lib/ovs-rcu.c @@ -38,7 +38,7 @@ struct ovsrcu_cb { }; struct ovsrcu_cbset { - struct ovs_list list_node; + struct ovsrcu_node rcu_node; struct ovsrcu_cb *cbs; size_t n_allocated; int n_cbs; @@ -49,6 +49,8 @@ struct ovsrcu_perthread { uint64_t seqno; struct ovsrcu_cbset *cbset; + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ + size_t n_pending; char name[16]; /* This thread's name. */ }; @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; static struct ovs_list ovsrcu_threads; static struct ovs_mutex ovsrcu_threads_mutex; -static struct guarded_list flushed_cbsets; -static struct seq *flushed_cbsets_seq; +static struct guarded_list flushed_nodes; +static struct seq *flushed_nodes_seq; static struct latch postpone_exit; static struct ovs_barrier postpone_barrier; static void ovsrcu_init_module(void); -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); static void ovsrcu_unregister__(struct ovsrcu_perthread *); static bool ovsrcu_call_postponed(void); static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) perthread = xmalloc(sizeof *perthread); perthread->seqno = seq_read(global_seqno); perthread->cbset = NULL; + ovs_list_init(&perthread->pending); + perthread->n_pending = 0; ovs_strlcpy(perthread->name, name[0] ? name : "main", sizeof perthread->name); @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) perthread = ovsrcu_perthread_get(); perthread->seqno = seq_read(global_seqno); - if (perthread->cbset) { - ovsrcu_flush_cbset(perthread); - } + ovsrcu_flush_nodes(perthread); seq_change(global_seqno); ovsrcu_quiesced(); @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) perthread = ovsrcu_perthread_get(); if (!seq_try_lock()) { perthread->seqno = seq_read(global_seqno); - if (perthread->cbset) { - ovsrcu_flush_cbset__(perthread, true); - } + ovsrcu_flush_nodes__(perthread, true); seq_change_protected(global_seqno); seq_unlock(); ovsrcu_quiesced(); @@ -264,10 +264,10 @@ ovsrcu_exit(void) /* Repeatedly: * * - Wait for a grace period. One important side effect is to push the - * running thread's cbset into 'flushed_cbsets' so that the next call + * running thread's nodes into 'flushed_nodes' so that the next call * has something to call. * - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, * we're done, otherwise the callbacks themselves might have requested * more deferred callbacks so we go around again. * @@ -282,6 +282,32 @@ ovsrcu_exit(void) } } +static void +ovsrcu_run_cbset(void *aux) +{ + struct ovsrcu_cbset *cbset = aux; + struct ovsrcu_cb *cb; + + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { + cb->function(cb->aux); + } + + free(cbset->cbs); + free(cbset); +} + +void +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, + struct ovsrcu_node *rcu_node) +{ + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); + + rcu_node->cb = function; + rcu_node->aux = aux; + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); + perthread->n_pending++; +} + /* Registers 'function' to be called, passing 'aux' as argument, after the * next grace period. * @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); cbset->n_allocated = MIN_CBS; cbset->n_cbs = 0; + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); } if (cbset->n_cbs == cbset->n_allocated) { @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) static bool OVS_NO_SANITIZE_FUNCTION ovsrcu_call_postponed(void) { - struct ovsrcu_cbset *cbset; - struct ovs_list cbsets; + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); + struct ovsrcu_node *node; - guarded_list_pop_all(&flushed_cbsets, &cbsets); - if (ovs_list_is_empty(&cbsets)) { + guarded_list_pop_all(&flushed_nodes, &nodes); + if (ovs_list_is_empty(&nodes)) { return false; } ovsrcu_synchronize(); - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { - struct ovsrcu_cb *cb; - - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { - cb->function(cb->aux); - } - free(cbset->cbs); - free(cbset); + LIST_FOR_EACH_POP (node, list_node, &nodes) { + node->cb(node->aux); } return true; @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) pthread_detach(pthread_self()); while (!latch_is_set(&postpone_exit)) { - uint64_t seqno = seq_read(flushed_cbsets_seq); + uint64_t cb_seqno = seq_read(flushed_nodes_seq); if (!ovsrcu_call_postponed()) { - seq_wait(flushed_cbsets_seq, seqno); + seq_wait(flushed_nodes_seq, cb_seqno); latch_wait(&postpone_exit); poll_block(); } @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) } static void -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) { - struct ovsrcu_cbset *cbset = perthread->cbset; + if (ovs_list_is_empty(&perthread->pending)) { + return; + } - if (cbset) { - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); - perthread->cbset = NULL; + perthread->cbset = NULL; + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, + perthread->n_pending); + ovs_list_init(&perthread->pending); + perthread->n_pending = 0; - if (protected) { - seq_change_protected(flushed_cbsets_seq); - } else { - seq_change(flushed_cbsets_seq); - } + if (protected) { + seq_change_protected(flushed_nodes_seq); + } else { + seq_change(flushed_nodes_seq); } } static void -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) { - ovsrcu_flush_cbset__(perthread, false); + ovsrcu_flush_nodes__(perthread, false); } static void ovsrcu_unregister__(struct ovsrcu_perthread *perthread) { - if (perthread->cbset) { - ovsrcu_flush_cbset(perthread); + if (!ovs_list_is_empty(&perthread->pending)) { + ovsrcu_flush_nodes(perthread); } ovs_mutex_lock(&ovsrcu_threads_mutex); @@ -438,8 +462,8 @@ ovsrcu_init_module(void) ovs_list_init(&ovsrcu_threads); ovs_mutex_init(&ovsrcu_threads_mutex); - guarded_list_init(&flushed_cbsets); - flushed_cbsets_seq = seq_create(); + guarded_list_init(&flushed_nodes); + flushed_nodes_seq = seq_create(); ovsthread_once_done(&once); } diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h index a1c15c126..efd43a1a2 100644 --- a/lib/ovs-rcu.h +++ b/lib/ovs-rcu.h @@ -125,6 +125,22 @@ * ovs_mutex_unlock(&mutex); * } * + * As an alternative to ovsrcu_postpone(), the same deferred execution can be + * achieved using ovsrcu_postpone_embedded(): + * + * struct deferrable { + * struct ovsrcu_node rcu_node; + * }; + * + * void + * deferred_free(struct deferrable *d) + * { + * ovsrcu_postpone_embedded(free, d, rcu_node); + * } + * + * Using embedded fields can be preferred sometimes to avoid the small + * allocations done in ovsrcu_postpone(). + * * In some rare cases an object may not be addressable with a pointer, but only * through an array index (e.g. because it's provided by another library). It * is still possible to have RCU semantics by using the ovsrcu_index type. @@ -173,6 +189,8 @@ #include "compiler.h" #include "ovs-atomic.h" +#include "openvswitch/list.h" + #if __GNUC__ #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } #define OVSRCU_INITIALIZER(VALUE) { VALUE } @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); (void) sizeof(*(ARG)), \ ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) +struct ovsrcu_node { + struct ovs_list list_node; + void (*cb)(void *aux); + void *aux; +}; + +/* Calls FUNCTION passing ARG as its pointer-type argument, which + * contains an 'ovsrcu_node' as a field named MEMBER. The function + * is called following the next grace period. See 'Usage' above for an + * example. + */ +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, + struct ovsrcu_node *node); +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ + (/* Verify that ARG is appropriate for FUNCTION. */ \ + (void) sizeof((FUNCTION)(ARG), 1), \ + /* Verify that ARG is a pointer type. */ \ + (void) sizeof(*(ARG)), \ + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ + &(ARG)->MEMBER)) + /* An array index protected by RCU semantics. This is an easier alternative to * an RCU protected pointer to a malloc'd int. */ typedef struct { atomic_int v; } ovsrcu_index; diff --git a/tests/test-rcu.c b/tests/test-rcu.c index bb17092bf..26150e7d9 100644 --- a/tests/test-rcu.c +++ b/tests/test-rcu.c @@ -17,11 +17,16 @@ #include <config.h> #undef NDEBUG #include "fatal-signal.h" +#include "ovs-atomic.h" #include "ovs-rcu.h" #include "ovs-thread.h" #include "ovstest.h" +#include "seq.h" +#include "timeval.h" #include "util.h" +#include "openvswitch/poll-loop.h" + static void * quiescer_main(void *aux OVS_UNUSED) { @@ -67,10 +72,136 @@ test_rcu_barrier(void) ovs_assert(count == 10); } +struct element { + struct ovsrcu_node rcu_node; + struct seq *trigger; + atomic_bool wait; +}; + +static void +trigger_cb(void *e_) +{ + struct element *e = (struct element *) e_; + + seq_change(e->trigger); +} + +static void * +wait_main(void *aux) +{ + struct element *e = aux; + + for (;;) { + bool wait; + + atomic_read(&e->wait, &wait); + if (!wait) { + break; + } + } + + seq_wait(e->trigger, seq_read(e->trigger)); + poll_block(); + + return NULL; +} + +static void +test_rcu_postpone_embedded(bool multithread) +{ + long long int timeout; + pthread_t waiter; + struct element e; + uint64_t seqno; + + atomic_init(&e.wait, true); + + if (multithread) { + waiter = ovs_thread_create("waiter", wait_main, &e); + } + + e.trigger = seq_create(); + seqno = seq_read(e.trigger); + + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); + + /* Check that GC holds out until all threads are quiescent. */ + timeout = time_msec(); + if (multithread) { + timeout += 200; + } + while (time_msec() <= timeout) { + ovs_assert(seq_read(e.trigger) == seqno); + } + + atomic_store(&e.wait, false); + + seq_wait(e.trigger, seqno); + poll_timer_wait_until(time_msec() + 200); + poll_block(); + + /* Verify that GC executed. */ + ovs_assert(seq_read(e.trigger) != seqno); + seq_destroy(e.trigger); + + if (multithread) { + xpthread_join(waiter, NULL); + } +} + +#define N_ORDER_CBS 5 + +struct order_element { + struct ovsrcu_node rcu_node; + int id; + int *log; + int *log_idx; +}; + +static void +order_cb(void *aux) +{ + struct order_element *e = aux; + e->log[(*e->log_idx)++] = e->id; +} + +static void +test_rcu_ordering(void) +{ + struct order_element elems[N_ORDER_CBS]; + int log[N_ORDER_CBS]; + int log_idx = 0; + + for (int i = 0; i < N_ORDER_CBS; i++) { + elems[i].id = i; + elems[i].log = log; + elems[i].log_idx = &log_idx; + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); + } + + ovsrcu_barrier(); + + ovs_assert(log_idx == N_ORDER_CBS); + for (int i = 0; i < N_ORDER_CBS; i++) { + if (log[i] != i) { + ovs_abort(0, "RCU embedded callback ordering violated: " + "expected cb %d at position %d, got %d", + i, i, log[i]); + } + } +} + static void test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { + const bool multithread = true; + + /* Execute single-threaded check before spawning additional threads. */ + test_rcu_postpone_embedded(!multithread); + test_rcu_postpone_embedded(multithread); + test_rcu_quiesce(); test_rcu_barrier(); + test_rcu_ordering(); } OVSTEST_REGISTER("test-rcu", test_rcu); -- 2.34.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
