From b1ed3c11c5457a37b2e1de7fba0f50b6b5d7b8e9 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Thu, 22 Jun 2017 18:40:27 +1200
Subject: [PATCH] Add kqueue support to WaitEventSet.

Add a kqueue based implementation of WaitEventSet for use on BSD-family
operating systems including FreeBSD, NetBSD, OpenBSD and macOS.  This is
similar to the epoll implementation for Linux systems in that the set of
kernel objects to monitor can be set up once and then reused, allowing for
more efficient IO multiplexing than poll().

Author: Thomas Munro
Reviewed-By: Marko Tiikkaja
Tested-By: Mateusz Guzik, Matteo Beccati, Keith Fiske, Heikki Linnakangas
Discussion: https://postgr.es/m/CAEepm%3D37oF84-iXDTQ9MrGjENwVGds%2B5zTr38ca73kWR7ez_tA%40mail.gmail.com
---
 configure                       |   4 +-
 configure.in                    |   4 +-
 src/backend/storage/ipc/latch.c | 291 +++++++++++++++++++++++++++++++-
 src/include/pg_config.h.in      |   6 +
 4 files changed, 300 insertions(+), 5 deletions(-)

diff --git a/configure b/configure
index 6414ec1ea6d..ec9db016017 100755
--- a/configure
+++ b/configure
@@ -12711,7 +12711,7 @@ $as_echo "#define HAVE_STDBOOL_H 1" >>confdefs.h
 fi
 
 
-for ac_header in atomic.h crypt.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
+for ac_header in atomic.h crypt.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/event.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
 do :
   as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
 ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default"
@@ -15100,7 +15100,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l
+for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l kqueue memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index 158d5a1ac82..bfb78124d72 100644
--- a/configure.in
+++ b/configure.in
@@ -1292,7 +1292,7 @@ AC_SUBST(UUID_LIBS)
 
 AC_HEADER_STDBOOL
 
-AC_CHECK_HEADERS([atomic.h crypt.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
+AC_CHECK_HEADERS([atomic.h crypt.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h sys/epoll.h sys/event.h sys/ipc.h sys/prctl.h sys/procctl.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/sockio.h sys/tas.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
 
 # On BSD, test for net/if.h will fail unless sys/socket.h
 # is included first.
@@ -1571,7 +1571,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l kqueue memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index f6dda9cc9ac..14af4fbd598 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -39,6 +39,9 @@
 #ifdef HAVE_SYS_EPOLL_H
 #include <sys/epoll.h>
 #endif
+#ifdef HAVE_SYS_EVENT_H
+#include <sys/event.h>
+#endif
 #ifdef HAVE_POLL_H
 #include <poll.h>
 #endif
@@ -59,10 +62,12 @@
  * define somewhere before this block.
  */
 #if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
-	defined(WAIT_USE_WIN32)
+	defined(WAIT_USE_KQUEUE) || defined(WAIT_USE_WIN32)
 /* don't overwrite manual choice */
 #elif defined(HAVE_SYS_EPOLL_H)
 #define WAIT_USE_EPOLL
+#elif defined(HAVE_KQUEUE)
+#define WAIT_USE_KQUEUE
 #elif defined(HAVE_POLL)
 #define WAIT_USE_POLL
 #elif WIN32
@@ -96,6 +101,10 @@ struct WaitEventSet
 	int			epoll_fd;
 	/* epoll_wait returns events in a user provided arrays, allocate once */
 	struct epoll_event *epoll_ret_events;
+#elif defined(WAIT_USE_KQUEUE)
+	int			kqueue_fd;
+	/* kevent returns events in a user provided arrays, allocate once */
+	struct kevent *kqueue_ret_events;
 #elif defined(WAIT_USE_POLL)
 	/* poll expects events to be waited on every poll() call, prepare once */
 	struct pollfd *pollfds;
@@ -128,6 +137,8 @@ static void drainSelfPipe(void);
 
 #if defined(WAIT_USE_EPOLL)
 static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
+#elif defined(WAIT_USE_KQUEUE)
+static void WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events);
 #elif defined(WAIT_USE_POLL)
 static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
 #elif defined(WAIT_USE_WIN32)
@@ -534,6 +545,8 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 
 #if defined(WAIT_USE_EPOLL)
 	sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
+#elif defined(WAIT_USE_KQUEUE)
+	sz += MAXALIGN(sizeof(struct kevent) * nevents);
 #elif defined(WAIT_USE_POLL)
 	sz += MAXALIGN(sizeof(struct pollfd) * nevents);
 #elif defined(WAIT_USE_WIN32)
@@ -552,6 +565,9 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 #if defined(WAIT_USE_EPOLL)
 	set->epoll_ret_events = (struct epoll_event *) data;
 	data += MAXALIGN(sizeof(struct epoll_event) * nevents);
+#elif defined(WAIT_USE_KQUEUE)
+	set->kqueue_ret_events = (struct kevent *) data;
+	data += MAXALIGN(sizeof(struct kevent) * nevents);
 #elif defined(WAIT_USE_POLL)
 	set->pollfds = (struct pollfd *) data;
 	data += MAXALIGN(sizeof(struct pollfd) * nevents);
@@ -576,6 +592,10 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	if (fcntl(set->epoll_fd, F_SETFD, FD_CLOEXEC) == -1)
 		elog(ERROR, "fcntl(F_SETFD) failed on epoll descriptor: %m");
 #endif							/* EPOLL_CLOEXEC */
+#elif defined(WAIT_USE_KQUEUE)
+	set->kqueue_fd = kqueue();
+	if (set->kqueue_fd < 0)
+		elog(ERROR, "kqueue failed: %m");
 #elif defined(WAIT_USE_WIN32)
 
 	/*
@@ -608,6 +628,8 @@ FreeWaitEventSet(WaitEventSet *set)
 {
 #if defined(WAIT_USE_EPOLL)
 	close(set->epoll_fd);
+#elif defined(WAIT_USE_KQUEUE)
+	close(set->kqueue_fd);
 #elif defined(WAIT_USE_WIN32)
 	WaitEvent  *cur_event;
 
@@ -717,6 +739,8 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	/* perform wait primitive specific initialization, if needed */
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
+#elif defined(WAIT_USE_KQUEUE)
+	WaitEventAdjustKqueue(set, event, 0);
 #elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #elif defined(WAIT_USE_WIN32)
@@ -736,10 +760,16 @@ void
 ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 {
 	WaitEvent  *event;
+#if defined(WAIT_USE_KQUEUE)
+	int old_events;
+#endif
 
 	Assert(pos < set->nevents);
 
 	event = &set->events[pos];
+#if defined(WAIT_USE_KQUEUE)
+	old_events = event->events;
+#endif
 
 	/*
 	 * If neither the event mask nor the associated latch changes, return
@@ -773,6 +803,8 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 
 #if defined(WAIT_USE_EPOLL)
 	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
+#elif defined(WAIT_USE_KQUEUE)
+	WaitEventAdjustKqueue(set, event, old_events);
 #elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #elif defined(WAIT_USE_WIN32)
@@ -863,6 +895,127 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 }
 #endif
 
+#if defined(WAIT_USE_KQUEUE)
+
+static inline void
+WaitEventAdjustKqueueAdd(struct kevent *k_ev, int filter, int action,
+						 WaitEvent *event)
+{
+	k_ev->ident = event->fd;
+	k_ev->filter = filter;
+	k_ev->flags = action | EV_CLEAR;
+	k_ev->fflags = 0;
+	k_ev->data = 0;
+
+	/*
+	 * On most BSD family systems, udata is of type void * so we could simply
+	 * assign event to it without casting, or use the EV_SET macro instead of
+	 * filling in the struct manually.  Unfortunately, NetBSD and possibly
+	 * others have it as intptr_t, so here we wallpaper over that difference
+	 * with an unsightly lvalue cast.
+	 */
+	*((WaitEvent **)(&k_ev->udata)) = event;
+}
+
+static inline void
+WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event)
+{
+	/* For now postmaster death can only be added, not removed. */
+	k_ev->ident = PostmasterPid;
+	k_ev->filter = EVFILT_PROC;
+	k_ev->flags = EV_ADD | EV_CLEAR;
+	k_ev->fflags = NOTE_EXIT;
+	k_ev->data = 0;
+	*((WaitEvent **)(&k_ev->udata)) = event;
+}
+
+/*
+ * old_events is the previous event mask, used to compute what has changed.
+ */
+static void
+WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
+{
+	int rc;
+	struct kevent k_ev[2];
+	int count = 0;
+	bool new_filt_read = false;
+	bool old_filt_read = false;
+	bool new_filt_write = false;
+	bool old_filt_write = false;
+
+	if (old_events == event->events)
+		return;
+
+	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
+	Assert(event->events == WL_LATCH_SET ||
+		   event->events == WL_POSTMASTER_DEATH ||
+		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+
+	if (event->events == WL_POSTMASTER_DEATH)
+	{
+		/*
+		 * Unlike all the other implementations, we detect postmaster death
+		 * using process notification instead of waiting on the postmaster
+		 * alive pipe.
+		 */
+		WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
+	}
+	else
+	{
+		/*
+		 * We need to compute the adds and deletes required to get from the
+		 * old event mask to the new event mask, since kevent treats readable
+		 * and writable as separate events.
+		 */
+		if (old_events == WL_LATCH_SET ||
+			(old_events & WL_SOCKET_READABLE))
+				old_filt_read = true;
+		if (event->events == WL_LATCH_SET ||
+			event->events == WL_POSTMASTER_DEATH ||
+			(event->events & WL_SOCKET_READABLE))
+			new_filt_read = true;
+		if (old_events & WL_SOCKET_WRITEABLE)
+			old_filt_write = true;
+		if (event->events & WL_SOCKET_WRITEABLE)
+			new_filt_write = true;
+		if (old_filt_read && !new_filt_read)
+			WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_DELETE,
+									 event);
+		else if (!old_filt_read && new_filt_read)
+			WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_READ, EV_ADD,
+									 event);
+		if (old_filt_write && !new_filt_write)
+			WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_DELETE,
+									 event);
+		else if (!old_filt_write && new_filt_write)
+			WaitEventAdjustKqueueAdd(&k_ev[count++], EVFILT_WRITE, EV_ADD,
+									 event);
+	}
+
+	Assert(count > 0);
+	Assert(count <= 2);
+
+	rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
+
+	if (rc < 0 && event->events == WL_POSTMASTER_DEATH && errno == ESRCH)
+	{
+		/*
+		 * The postmaster is already dead.  Defer reporting this to the caller
+		 * until wait time, for compatibility with the other implementations.
+		 * To do that we will now add the regular alive pipe.
+		 */
+		WaitEventAdjustKqueueAdd(&k_ev[0], EVFILT_READ, EV_ADD, event);
+		rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
+	}
+
+	if (rc < 0)
+		ereport(ERROR,
+				(errcode_for_socket_access(),
+				 errmsg("kevent() failed: %m")));
+}
+
+#endif
+
 #if defined(WAIT_USE_WIN32)
 static void
 WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
@@ -1150,6 +1303,142 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 	return returned_events;
 }
 
+#elif defined(WAIT_USE_KQUEUE)
+
+/*
+ * Wait using FreeBSD kqueue(2)/kevent(2).  Also available on other BSD-family
+ * systems including macOS.
+ *
+ * This is the preferrable wait method for systems that have it, as several
+ * readiness notifications are delivered, without having to iterate through
+ * all of set->events.
+ *
+ * For now this mirrors the epoll code, but in future it could modify the fd
+ * set in the same call to kevent as it uses for waiting instead of doing that
+ * with separate system calls.
+ */
+static int
+WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
+					  WaitEvent *occurred_events, int nevents)
+{
+	int			returned_events = 0;
+	int			rc;
+	WaitEvent  *cur_event;
+	struct kevent *cur_kqueue_event;
+	struct timespec timeout;
+	struct timespec *timeout_p;
+
+	if (cur_timeout < 0)
+		timeout_p = NULL;
+	else
+	{
+		timeout.tv_sec = cur_timeout / 1000;
+		timeout.tv_nsec = (cur_timeout % 1000) * 1000000;
+		timeout_p = &timeout;
+	}
+
+	/* Sleep */
+	rc = kevent(set->kqueue_fd, NULL, 0,
+				set->kqueue_ret_events, nevents,
+				timeout_p);
+
+	/* Check return code */
+	if (rc < 0)
+	{
+		/* EINTR is okay, otherwise complain */
+		if (errno != EINTR)
+		{
+			waiting = false;
+			ereport(ERROR,
+					(errcode_for_socket_access(),
+					 errmsg("kevent() failed while trying to wait: %m")));
+		}
+		return 0;
+	}
+	else if (rc == 0)
+	{
+		/* timeout exceeded */
+		return -1;
+	}
+
+	/*
+	 * At least one event occurred, iterate over the returned kqueue events
+	 * until they're either all processed, or we've returned all the events
+	 * the caller desired.
+	 */
+	for (cur_kqueue_event = set->kqueue_ret_events;
+		 cur_kqueue_event < (set->kqueue_ret_events + rc) &&
+		 returned_events < nevents;
+		 cur_kqueue_event++)
+	{
+		/* kevent's data pointer is set to the associated WaitEvent */
+		cur_event = (WaitEvent *) cur_kqueue_event->udata;
+
+		occurred_events->pos = cur_event->pos;
+		occurred_events->user_data = cur_event->user_data;
+		occurred_events->events = 0;
+
+		if (cur_event->events == WL_LATCH_SET &&
+			cur_kqueue_event->filter == EVFILT_READ)
+		{
+			/* There's data in the self-pipe, clear it. */
+			drainSelfPipe();
+
+			if (set->latch->is_set)
+			{
+				occurred_events->fd = PGINVALID_SOCKET;
+				occurred_events->events = WL_LATCH_SET;
+				occurred_events++;
+				returned_events++;
+			}
+		}
+		else if (cur_event->events == WL_POSTMASTER_DEATH &&
+				 ((cur_kqueue_event->filter == EVFILT_PROC &&
+				   (cur_kqueue_event->fflags & NOTE_EXIT) != 0) ||
+				  cur_kqueue_event->filter == EVFILT_READ))
+		{
+			/*
+			 * Postmaster death notification usually triggers filter
+			 * EVFILT_PROC.  But if the postmaster process was aready dead
+			 * when we tried to add it to the kqueue, then
+			 * WaitEventAdjustKqueue adds the alive pipe instead, which
+			 * triggers EVFILT_READ here when we try to wait.
+			 */
+			occurred_events->fd = PGINVALID_SOCKET;
+			occurred_events->events = WL_POSTMASTER_DEATH;
+			occurred_events++;
+			returned_events++;
+		}
+		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		{
+			Assert(cur_event->fd >= 0);
+
+			if ((cur_event->events & WL_SOCKET_READABLE) &&
+				(cur_kqueue_event->filter == EVFILT_READ))
+			{
+				/* readable, or EOF */
+				occurred_events->events |= WL_SOCKET_READABLE;
+			}
+
+			if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+				(cur_kqueue_event->filter == EVFILT_WRITE))
+			{
+				/* writable, or EOF */
+				occurred_events->events |= WL_SOCKET_WRITEABLE;
+			}
+
+			if (occurred_events->events != 0)
+			{
+				occurred_events->fd = cur_event->fd;
+				occurred_events++;
+				returned_events++;
+			}
+		}
+	}
+
+	return returned_events;
+}
+
 #elif defined(WAIT_USE_POLL)
 
 /*
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 90dda8ea050..4bcabc3b381 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -330,6 +330,9 @@
 /* Define to 1 if you have isinf(). */
 #undef HAVE_ISINF
 
+/* Define to 1 if you have the `kqueue' function. */
+#undef HAVE_KQUEUE
+
 /* Define to 1 if you have the <langinfo.h> header file. */
 #undef HAVE_LANGINFO_H
 
@@ -598,6 +601,9 @@
 /* Define to 1 if you have the <sys/epoll.h> header file. */
 #undef HAVE_SYS_EPOLL_H
 
+/* Define to 1 if you have the <sys/event.h> header file. */
+#undef HAVE_SYS_EVENT_H
+
 /* Define to 1 if you have the <sys/ipc.h> header file. */
 #undef HAVE_SYS_IPC_H
 
-- 
2.17.1 (Apple Git-112)

