Hello, everyone,

I got some very positive feedback last time, so I decided to work on a new
version. I've streamlined things a bit, introducing a key type to handle
lookups, and moved a bit of code around into new functions.

The biggest change, however, is the 'gsync_requeue' RPC. It allows one to move blocking threads so that they wait on a different address. It's not as useful as the other 2 RPC's, but it can be used to optimize POSIX condition variables
to avoid the infamous 'thundering herd' problem.

This patch is a bit longer than the one before, so I'm sending it as an
attachment instead of copying its contents here.

As before, any comments, advice or questions are very welcomed.

diff --git a/include/mach/gnumach.defs b/include/mach/gnumach.defs
index dd4da87..5235df6 100644
--- a/include/mach/gnumach.defs
+++ b/include/mach/gnumach.defs
@@ -84,3 +84,55 @@ simpleroutine task_set_name(
 routine register_new_task_notification(
 		host_priv	: host_priv_t;
 		notification	: mach_port_send_t);
+
+/* Test that the contents of ADDR are equal to the 32-bit integer VAL1.
+ * If they are not, return immediately, otherwise, block until a
+ * matching 'gsync_wake' is done on the same address. FLAGS is used
+ * to control how the thread waits, and may be composed of:
+ * - GSYNC_SHARED: The address may be shared among tasks. If this
+     bit is not set, the address is assumed to be task-local.
+ * - GSYNC_QUAD: Additionally check that the adjacent 32-bit word
+     following ADDR matches the value VAL2.
+ * - GSYNC_TIMED: The call only blocks for MSEC milliseconds. */
+routine gsync_wait(
+  task : task_t;
+  addr : vm_offset_t;
+  val1 : unsigned;
+  val2 : unsigned;
+  msec : natural_t;
+  flags : int);
+
+/* Wake up threads waiting on the address ADDR. Much like with
+ * 'gsync_wait', the parameter FLAGS controls how it is done. In this
+ * case, it may be composed of the following:
+ * - GSYNC_SHARED: Same as with 'gsync_wait'.
+ * - GSYNC_BROADCAST: Wake up every thread waiting on the address. If
+ *   this flag is not set, the call wakes (at most) 1 thread.
+ * - GSYNC_MUTATE: Before waking any potential waiting threads, set the
+ *   contents of ADDR to VAL.
+ *
+ * This RPC is implemented as a simple routine for efficiency reasons,
+ * and because the return value rarely matters. */
+simpleroutine gsync_wake(
+  task : task_t;
+  addr : vm_offset_t;
+  val : unsigned;
+  flags : int);
+
+/* Arrange for threads waiting on address SRC_ADDR to instead
+ * wait on address DST_ADDR. If WAKE_ONE is true, additionally
+ * wake one of the threads waiting on SRC_ADDR. For this function,
+ * the parameter flags may be a combination of:
+ * - GSYNC_SHARED: Just like with 'gsync_wait' and 'gsync_wake'.
+ * - GSYNC_BROADCAST: Move all the threads waiting on SRC_ADDR. If
+     this flag is not set, the call moves (at most) 1 thread.
+ *
+ * This RPC is also a simple routine, and for the same reasons as
+ * with 'gsync_wake'. */
+simpleroutine gsync_requeue(
+  task : task_t;
+  src_addr : vm_offset_t;
+  dst_addr : vm_offset_t;
+  wake_one : boolean_t;
+  flags : int);
+
diff --git a/kern/gsync.c b/kern/gsync.c
new file mode 100644
index 0000000..12398ab
--- /dev/null
+++ b/kern/gsync.c
@@ -0,0 +1,381 @@
+/* Copyright (C) 2016 Free Software Foundation, Inc.
+   Contributed by Agustina Arzille <avarzi...@riseup.net>, 2016.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License
+   as published by the Free Software Foundation; either
+   version 3 of the license, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public
+   License along with this program; if not, see
+   <http://www.gnu.org/licenses/>.
+*/
+
+#include <kern/gsync.h>
+#include <kern/sched_prim.h>
+#include <kern/thread.h>
+#include <kern/lock.h>
+#include <kern/list.h>
+#include <vm/vm_map.h>
+
+/* An entry in the global hash table. */
+struct gsync_hbucket
+{
+  struct list entries;
+  decl_simple_lock_data (, lock)
+};
+
+/* A key used to uniquely identify an address that a thread is
+ * waiting on. Its members' values depend on whether said
+ * address is shared or task-local. */
+struct gsync_key
+{
+  unsigned long u;
+  unsigned long v;
+};
+
+/* A thread that is blocked on an address with 'gsync_wait'. */
+struct gsync_waiter
+{
+  struct list link;
+  struct gsync_key key;
+  thread_t waiter;
+};
+
+#define GSYNC_NBUCKETS   512
+static struct gsync_hbucket gsync_buckets[GSYNC_NBUCKETS];
+
+void gsync_setup (void)
+{
+  for (int i = 0; i < GSYNC_NBUCKETS; ++i)
+    {
+      list_init (&gsync_buckets[i].entries);
+      simple_lock_init (&gsync_buckets[i].lock);
+    }
+}
+
+/* Convenience comparison functions for gsync_key's. */
+
+static inline int
+gsync_key_eq (const struct gsync_key *lp,
+  const struct gsync_key *rp)
+{
+  return (lp->u == rp->u && lp->v == rp->v);
+}
+
+static inline int
+gsync_key_lt (const struct gsync_key *lp,
+  const struct gsync_key *rp)
+{
+  return (lp->u < rp->u || (lp->u == rp->u && lp->v < rp->v));
+}
+
+#define MIX2_LL(x, y)   ((((x) << 5) | ((x) >> 27)) ^ (y))
+
+static inline unsigned int
+gsync_key_hash (const struct gsync_key *keyp)
+{
+  unsigned int ret = sizeof (void *);
+#ifndef __LP64__
+  ret = MIX2_LL (ret, keyp->u);
+  ret = MIX2_LL (ret, keyp->v);
+#else
+  ret = MIX2_LL (ret, keyp->u & ~0U);
+  ret = MIX2_LL (ret, keyp->u >> 32);
+  ret = MIX2_LL (ret, keyp->v & ~0U);
+  ret = MIX2_LL (ret, keyp->v >> 32);
+#endif
+  return (ret);
+}
+
+/* Test if the passed VM Map can access the address range
+ * [ADDR, ADDR + SIZE] with protection PROT. */
+static int
+valid_access_p (vm_map_t map, vm_offset_t addr,
+  vm_offset_t size, vm_prot_t prot)
+{
+  vm_map_entry_t entry;
+  return (vm_map_lookup_entry (map, addr, &entry) &&
+    entry->vme_end >= addr + size &&
+    (prot & entry->protection) == prot);
+}
+
+/* Given a task and an address, fill the waiter in *OUTP and return
+ * the corresponding bucket. FLAGS is used to specify several things
+ * about the address (Width, protection, if it's task-local or not).
+ * Currently can only fail when given an invalid address. */
+static int
+gsync_fill_waiter (task_t task, vm_offset_t addr,
+  int flags, struct gsync_waiter *outp)
+{
+  vm_prot_t prot = VM_PROT_READ |
+    ((flags & GSYNC_MUTATE) ? VM_PROT_WRITE : 0);
+  vm_offset_t size = sizeof (unsigned int) *
+    ((flags & GSYNC_QUAD) ? 2 : 1);
+
+  if (unlikely (!valid_access_p (task->map, addr, size, prot)))
+    return (-1);
+
+  if (flags & GSYNC_SHARED)
+    {
+      /* For a shared address, we need the VM object
+       * and offset as the keys. */
+      vm_map_t map = task->map;
+      vm_map_version_t ver;
+      vm_prot_t rpr;
+      vm_object_t obj;
+      vm_offset_t off;
+      boolean_t wired_p;
+
+      if (unlikely (vm_map_lookup (&map, addr, prot, &ver,
+          &obj, &off, &rpr, &wired_p) != KERN_SUCCESS))
+        return (-1);
+
+      outp->key.u = (unsigned long)obj;
+      outp->key.v = (unsigned long)off;
+    }
+  else
+    {
+      /* Task-local address. The keys are the task's map and
+       * the virtual address itself. */
+      outp->key.u = (unsigned long)task->map;
+      outp->key.v = (unsigned long)addr;
+    }
+
+  return ((int)(gsync_key_hash (&outp->key) % GSYNC_NBUCKETS));
+}
+
+static inline struct gsync_waiter*
+node_to_waiter (struct list *nodep)
+{
+  return (list_entry (nodep, struct gsync_waiter, link));
+}
+
+kern_return_t gsync_wait (task_t task, vm_offset_t addr,
+  unsigned int lo, unsigned int hi, natural_t msec, int flags)
+{
+  struct gsync_waiter w;
+  int bucket = gsync_fill_waiter (task, addr, flags, &w);
+
+  if (bucket < 0)
+    return (KERN_INVALID_ADDRESS);
+
+  struct gsync_hbucket *hbp = gsync_buckets + bucket;
+  simple_lock (&hbp->lock);
+
+  /* Before doing any work, check that the expected value(s)
+   * match the contents of the address. Otherwise, the waiting
+   * thread could potentially miss a wakeup. */
+  if (((unsigned int *)addr)[0] != lo ||
+      ((flags & GSYNC_QUAD) &&
+        ((unsigned int *)addr)[1] != hi))
+    {
+      simple_unlock (&hbp->lock);
+      return (KERN_INVALID_ARGUMENT);
+    }
+
+  /* Look for the first entry in the hash bucket that
+   * compares greater than this waiter. */
+  struct list *runp;
+  list_for_each (&hbp->entries, runp)
+    {
+      struct gsync_waiter *p = node_to_waiter (runp);
+      if (gsync_key_lt (&w.key, &p->key))
+        break;
+    }
+
+  /* Finally, add ourselves to the list and go to sleep. */
+  list_add (runp->prev, runp, &w.link);
+  w.waiter = current_thread ();
+
+  if (flags & GSYNC_TIMED)
+    thread_will_wait_with_timeout (w.waiter, msec);
+  else
+    thread_will_wait (w.waiter);
+
+  thread_sleep (0, (simple_lock_t)&hbp->lock, TRUE);
+
+  /* We're back. */
+  kern_return_t ret = current_thread()->wait_result;
+  if (ret != THREAD_AWAKENED)
+    {
+      /* We were interrupted or timed out. */
+      simple_lock (&hbp->lock);
+      if (w.link.next != NULL)
+        list_remove (&w.link);
+      simple_unlock (&hbp->lock);
+
+      /* XXX: These codes aren't really descriptive, but it's
+       * the best I can think of right now. */
+      ret = ret == THREAD_INTERRUPTED ?
+        KERN_ABORTED : KERN_TERMINATED;
+    }
+  else
+    ret = KERN_SUCCESS;
+
+  return (ret);
+}
+
+/* Remove a waiter from the queue, wake it up, and
+ * return the next node. */
+static inline struct list*
+dequeue_waiter (struct list *nodep)
+{
+  struct list *nextp = list_next (nodep);
+  list_remove (nodep);
+  list_node_init (nodep);
+  clear_wait (node_to_waiter(nodep)->waiter,
+    THREAD_AWAKENED, FALSE);
+  return (nextp);
+}
+
+static inline struct list*
+gsync_find_key (const struct list *entries,
+  const struct gsync_key *keyp, int *exactp)
+{
+  /* Look for a key that matches. We take advantage of the fact
+   * that the entries are sorted to break out of the loop as
+   * early as possible. */
+  struct list *runp;
+  list_for_each (entries, runp)
+    {
+      struct gsync_waiter *p = node_to_waiter (runp);
+      if (gsync_key_lt (keyp, &p->key))
+        break;
+      else if (gsync_key_eq (keyp, &p->key))
+        {
+          if (exactp != 0)
+            *exactp = 1;
+          break;
+        }
+    }
+
+  return (runp);
+}
+
+kern_return_t gsync_wake (task_t task,
+  vm_offset_t addr, unsigned int val, int flags)
+{
+  struct gsync_waiter w;
+  int bucket = gsync_fill_waiter (task, addr, flags, &w);
+
+  if (bucket < 0)
+    return (KERN_INVALID_ADDRESS);
+
+  struct gsync_hbucket *hbp = gsync_buckets + bucket;
+
+  /* When the broadcast bit is set, we wake every waiter
+   * that matches. Setting the amount to UINT_MAX
+   * should do the trick until we can manage a ridiculously
+   * large amount of waiters. */
+  unsigned int nw = (flags & GSYNC_BROADCAST) ? ~0U : 1;
+  kern_return_t ret = KERN_INVALID_ARGUMENT;
+  simple_lock (&hbp->lock);
+
+  if (flags & GSYNC_MUTATE)
+    __atomic_store_n ((unsigned int *)addr,
+      val, __ATOMIC_RELEASE);
+
+  int found = 0;
+  struct list *runp = gsync_find_key (&hbp->entries, &w.key, &found);
+  if (found)
+    {
+      do
+        runp = dequeue_waiter (runp);
+      while (--nw != 0 && !list_end (&hbp->entries, runp) &&
+        gsync_key_eq (&node_to_waiter(runp)->key, &w.key));
+
+      ret = KERN_SUCCESS;
+    }
+
+  simple_unlock (&hbp->lock);
+  return (ret);
+}
+
+kern_return_t gsync_requeue (task_t task, vm_offset_t src,
+  vm_offset_t dst, boolean_t wake_one, int flags)
+{
+  struct gsync_waiter src_w, dst_w;
+  int src_bkt = gsync_fill_waiter (task, src, flags, &src_w);
+  int dst_bkt = gsync_fill_waiter (task, dst, flags, &dst_w);
+
+  if ((src_bkt | dst_bkt) < 0)
+    return (KERN_INVALID_ADDRESS);
+
+  /* If we're asked to unconditionally wake up a waiter, then
+   * we need to remove a maximum of two threads from the queue. */
+  unsigned int nw = (flags & GSYNC_BROADCAST) ? ~0U : 1 + wake_one;
+  struct gsync_hbucket *bp1 = gsync_buckets + src_bkt;
+  struct gsync_hbucket *bp2 = gsync_buckets + dst_bkt;
+
+  /* Acquire the locks in order, to prevent any potential deadlock. */
+  if (bp1 == bp2)
+    simple_lock (&bp1->lock);
+  else if ((unsigned long)bp1 < (unsigned long)bp2)
+    {
+      simple_lock (&bp1->lock);
+      simple_lock (&bp2->lock);
+    }
+  else
+    {
+      simple_lock (&bp2->lock);
+      simple_lock (&bp2->lock);
+    }
+
+  int ret, exact = 0;
+  struct list *inp = gsync_find_key (&bp1->entries, &src_w.key, &exact);
+  if (!exact)
+    /* There are no waiters in the source queue. */
+    ret = KERN_INVALID_ARGUMENT;
+  else
+    {
+      struct list *outp = gsync_find_key (&bp2->entries, &dst_w.key, 0);
+
+      /* We're going to need a node that points one past the
+       * end of the waiters in the source queue. */
+      struct list *endp = inp;
+
+      do
+        {
+          /* Modify the keys while iterating. */
+          node_to_waiter(endp)->key = dst_w.key;
+          endp = list_next (endp);
+        }
+      while (--nw != 0 && !list_end (&bp1->entries, endp) &&
+        gsync_key_eq (&node_to_waiter(endp)->key, &src_w.key));
+
+      /* Splice the list by removing waiters from the source queue
+       * and inserting them into the destination queue. */
+      inp->prev->next = endp;
+      endp->prev->next = outp->next;
+      endp->prev = inp->prev;
+
+      outp->next = inp;
+      inp->prev = outp;
+
+      if (wake_one)
+        (void)dequeue_waiter (inp);
+    }
+
+  /* Release the locks and we're done.*/
+  if (bp1 == bp2)
+    simple_unlock (&bp1->lock);
+  else if ((unsigned long)bp1 < (unsigned long)bp2)
+    {
+      simple_unlock (&bp2->lock);
+      simple_unlock (&bp1->lock);
+    }
+  else
+    {
+      simple_unlock (&bp1->lock);
+      simple_unlock (&bp2->lock);
+    }
+
+  return (ret);
+}
+

Reply via email to