/**

   "RTSIG" is a shorthand for using O_ASYNC to make descriptors send
   signals when readable/writable and to use POSIX real-time signals
   witch are queued unlike normal signals.  At first blush this may
   seem like a alternative to epoll, but a number of problems arise
   when attempting to build an eventloop entirely out of rtsig.
   Still, we can use rtsig in combination with poll() to
   provide an eventloop that allows for many thousands of sockets
   without huge overheads implicit with using select() or poll()
   alone.  epoll and kqueue are far superior to rtsig and should be
   used where available, but rtsig has been in standard Linux kernels
   for a long time and have a huge installation base.  epoll requires
   special patches for 2.4 kernels and 2.6 kernels are not yet nearly
   so ubiquitous.

   rtsig problems:
    - O_ASYNC mechanisms work only on sockets - not pipes or tty's

    - O_ASYNC signals are edge-triggered, POLLIN on packet arriving
   or socket close; POLLOUT when a socket transitions from
   non-writable to writable.  Being edge-triggered means the
   event-handler callbacks must transition the level ( reading
   completely the socket buffer contents ) or it will be unable to
   reliably receive notification again.

   - rtsig implementations must be intimately involved in how a
   process dispatches signals.

   - delivering signals per-event can be expensive, CPU-wise, but
     sigtimedwait() blocks on signals only and means non-sockets
     cannot be serviced.

   Theory of operation:
    This libevent module uses rtsig to allow us to manage a set of
    poll-event descriptors.  We can drop uninteresting fd's from the
    pollset if the fd will send a signal when it becomes interesting
    again.

    poll() offers us level-triggering and, when we have verified the
    level of a socket, we can trust the edge-trigger nature of the
    ASYNC signal.

    As an eventloop we must poll for external events but leverage
    kernel functionality to sleep between events ( until the loop's
    next scheduled timed event ).

    If we are polling on any non-sockets then we simply have no choice
    about blocking on the poll() call.  If we blocked on the
    sigtimedwait() call as rtsig papers recommend we will not wake on
    non-socket state transitions.  As part of libevent, this module
    must support non-socket polling.

    Many applications, however, do not need to poll on non-sockets and
    so this module should be able to optimize this case by using
    sigtimedwait().  For this reason this module can actually trigger
    events in each of three different ways:
      - poll() returning ready events from descriptors in the pollset

      - real-time signals dequeued via sigtimedwait()

      - real-time signals that call an installed signal handler which in
    turn writes the contents of siginfo to one end of a socketpair
    DGRAM socket.  The other end of the socket is always in the
    pollset so poll will be guaranteed to return even if the signal is
    received before entering poll().

    non-socket descriptors force us to block on the poll() for the
    duration of a dispatch.  In this case we unblock (w/ sigprocmask)
    the managed signals just before polling.  Each managed signal is
    handled by signal_handler() which send()'s the contents of siginfo
    over the socketpair.  Otherwise, we call poll() with a timeout of
    0ms so it checks the levels of the fd's in the pollset and returns
    immediately.  Any fd that is a socket and has no active state is
    removed from the pollset for the next pass -- we will rely on
    getting a signal for events on these fd's.

    The receiving end of the siginfo socketpair is in the pollset
    (permanently) so if we are polling on non-sockets, the delivery of
    signals immediately following sigprocmask( SIG_UNBLOCK...) will
    result in a readable op->signal_recv_fd which ensures the poll()
    will return immediately.  If the poll() call is blocking and a
    signal arrives ( possibly a real-time signal from a socket not in
    the pollset ) its handler will write the data to the socketpair
    and interrupt the poll().

    After every poll call we attempt a non-blocking recv from the
    signal_recv_fd and continue to recv and dispatch the events until
    recv indicates the socket buffer is empty.

    One might raise concerns about receiving event activations from
    both poll() and from the rtsig data in the signal_recv_fd.
    Fortunately, libevent is already structured for event coalescing,
    so this issue is mitigated ( though we do some work twice for the
    same event making us less efficient ).  I suspect that the cost of
    turning off the O_ASYNC flag on fd's in the pollset is more
    expensive than handling some events twice.  Looking at the
    kernel's source code for setting O_ASYNC, it looks like it takes a
    global kernel lock...

    After a poll and recv-loop for the signal_recv_fd, we finally do a
    sigtimedwait().  sigtimedwait will only block if we haven't
    blocked in poll() and we have not enqueued events from either the
    poll or the recv-loop.  Because sigtimedwait blocks all signals
    that are not in the set of signals to be dequeued, we need to
    dequeue almost all signals and make sure we dispatch them
    correctly.  We dequeue any signal that is not blocked as well as
    all libevent-managed signals.  If we get a signal that is not
    managed by libevent we lookup the sigaction for the specific
    signal and call that function ourselves.

    Finally, I should mention that getting a SIGIO signal indicates
    that the rtsig buffer has overflowed and we have lost events.
    This forces us to add _every_ descriptor to the pollset to recover.

*/


#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

/* Enable F_SETSIG and F_SETOWN */
#define _GNU_SOURCE

#include <sys/types.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#else
#include <sys/_time.h>
#endif
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/queue.h>
#include <sys/tree.h>
#include <unistd.h>
#include <sys/socket.h>

#include "event.h"
#include "event-internal.h"
#include "log.h"
extern struct event_list signalqueue;

#include <linux/unistd.h>
#ifndef __NR_gettid
#define gettid() getpid()
#else

#if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 3)))
_syscall0(pid_t,gettid)
#endif

#endif

#define EVLIST_NONSOCK   0x1000 /* event is for a non-socket file-descriptor */
#define EVLIST_DONTDEL   0x2000 /* event should always be in the pollset */
#define MAXBUFFERSIZE (1024 * 1024 * 2) /* max socketbuffer for signal-spair */
#define INIT_MAX 16     /* init/min # of fd positions in our pollset */

static int signal_send_fd[ _NSIG ]; /* the globalend of the signal socketpair */
static int trouble[ _NSIG ]; /* 1 when signal-handler cant send to signal_send_fd */

struct rtdata;
TAILQ_HEAD( rtdata_list, rtdata );

struct rtsigop {
  sigset_t sigs;        // signal mask for all _managed_ signals
  struct pollfd *poll;  // poll structures
  struct rtdata **ptodat;  // map poll_position to rtdata
  int cur;              // cur # fd's in a poll set
  int max;              // max # fd's in a poll set, start at 16 and grow as needed
  int total;            // count of fd's we are watching now
  int signo;            // the signo we use for ASYNC fd notifications
  int nonsock;          // number of non-socket fd's we are watching
  int highestfd;        // highest fd accomodated by fdtodat
  struct rtdata_list **fdtodat; // map fd to rtdata ( and thus to event )
  int signal_recv_fd;   // recv side of the signal_send_fd
  int signal_send_fd;   // recv side of the signal_send_fd
  struct event sigfdev; // our own event structure for the signal fd
};

struct rtdata {
  /* rtdata holds rtsig-private state on each event */
  TAILQ_ENTRY (rtdata) next;
  struct event *ev;
  int poll_position;
};

void *rtsig_init(void);
int rtsig_add(void *, struct event *);
int rtsig_del(void *, struct event *);
int rtsig_recalc(struct event_base *, void *, int);
int rtsig_dispatch(struct event_base *, void *, struct timeval *);

struct eventop rtsigops = {
  "rtsig",
  rtsig_init,
  rtsig_add,
  rtsig_del,
  rtsig_recalc,
  rtsig_dispatch
};

static void
signal_handler( int sig, siginfo_t *info, void *ctx ) {
  // the signal handler for all libevent-managed signals
  // only used if we need to do a blocking poll() call due to non-socket fd's
  // in the pollset.
  
  siginfo_t *i = info;
  siginfo_t i_local;

  if ( trouble[ sig - 1 ] ) {
    i_local.si_signo = SIGIO;
    i_local.si_errno = 0;
    i_local.si_code = 0;
    i = &i_local;
    trouble[ sig - 1 ] = 0;
  }

  if ( send( signal_send_fd[ sig - 1 ], i, sizeof( *i ), MSG_DONTWAIT|MSG_NOSIGNAL ) == -1 ) {
    trouble[ sig - 1 ] = 1;
  }
  return;
}

static void donothing( int fd, short event, void *arg ) {
  // callback for our signal_recv_fd event structure
  // we don't want to act on these events, we just want to wake the poll()
};

static void
signotset( sigset_t *set ) {
  int i, l;
  l = sizeof( *set ) / 4;
  for ( i = 0; i < l; i++ ) {
    ((unsigned *)set)[i] = ~((unsigned *)set)[i];
  }
}

/*  The next three functions manage our private data about each event struct */

static int grow_fdset( struct rtsigop *op, int newhigh ) {
  // grow the fd -> rtdata array because we have encountered a new fd too high
  // to fit in the existing array

  struct rtdata_list **p;
  struct rtdata_list *datset;
  int i,x;
  int newcnt = ( newhigh + 1 ) << 1;

  if ( newhigh <= op->highestfd ) return ( 0 );

  p = op->fdtodat;
  p = realloc( op->fdtodat, sizeof( struct rtdata_list * ) * newcnt );
  if ( p == NULL ) return( -1 );
  op->fdtodat = p;

  datset = calloc( newcnt - ( op->highestfd + 1 ), sizeof( struct rtdata_list ) );
  if ( datset == NULL ) return( -1 );

  for ( i = op->highestfd + 1, x = 0;
	i < newcnt;
	i++, x++
	) {
    op->fdtodat[i] = &( datset[x] );
    TAILQ_INIT( op->fdtodat[i] );
  }

  op->highestfd = newcnt - 1;
  return( 0 );
}

static struct rtdata *
ev2dat( struct rtsigop *op, struct event *ev, int create ) {
  // given an event struct, find the dat structure that corresponds to it
  // if create is non-zero and the rtdata structure does not exist, create it
  // return NULL if not found

  int found = 0;
  int fd = ev->ev_fd;
  struct rtdata *ret = NULL;

  if ( op->highestfd < fd && create )
    if ( grow_fdset( op, fd ) == -1 )
      return( NULL );
  
  TAILQ_FOREACH( ret, op->fdtodat[ fd ], next )
    if ( ret->ev == ev ) {
      found = 1;
      break;
    }

  if ( ! found ) {
    if ( ! create ) return( NULL );

    ret = calloc( 1, sizeof( struct rtdata ) );
    if ( ret == NULL ) return( NULL );
    ret->ev = ev;
    ret->poll_position = -1;
    TAILQ_INSERT_TAIL( op->fdtodat[ fd ], ret, next );
  }

  return( ret );
}

static void
dat_del( struct rtsigop *op, struct rtdata *dat ) {
  // delete our private notes about a given event struct
  // called from rtsig_del() only
  int fd;
  if ( dat == NULL ) return;
  fd = dat->ev->ev_fd;

  TAILQ_REMOVE( op->fdtodat[ fd ], dat, next );
  memset( dat, 0, sizeof( *dat ) );
  free( dat );
}


static void
set_sigaction( int sig ) {
  // set the standard handler for any libevent-managed signal, including the 
  //  rtsig used for O_ASYNC notifications
  struct sigaction act;

  act.sa_flags = SA_RESTART | SA_SIGINFO;
  sigfillset( &( act.sa_mask ) );
  act.sa_sigaction = &signal_handler;
  sigaction( sig, &act, NULL );
}

static int find_rt_signal() {
  // find an unused rtsignal
  struct sigaction act;
  int sig = SIGRTMIN;

  while ( sig <= SIGRTMAX ) {
    if ( sigaction( sig, NULL, &act ) != 0 ) {
      if ( errno == EINTR ) continue;
    } else {
      if ( act.sa_flags & SA_SIGINFO ) {
        if ( act.sa_sigaction == NULL ) return( sig );
      } else {
        if ( act.sa_handler == SIG_DFL ) return( sig );
      }
    }
    sig++;
  }
  return( 0 );
}

/* the next three functions manage our pollset and the memory management for 
   fd -> rtdata -> event -> poll_position maps
*/

static int
poll_add(struct rtsigop *op, struct event *ev, struct rtdata *dat)
{
  struct pollfd *pfd;
  int newmax = op->max << 1;
  int pp;

  if ( op->poll == NULL )
    return( 0 );

  if ( dat == NULL )
    dat = ev2dat( op, ev, 0 );

  if ( dat == NULL ) return( 0 );

  pp = dat->poll_position;

  if ( pp != -1 ) {
    pfd = &op->poll[ pp ];
    if ( ev->ev_events & EV_READ )
      pfd->events |= POLLIN;
    
    if ( ev->ev_events & EV_WRITE )
      pfd->events |= POLLOUT;
    
    return( 0 );
  }

  if ( op->cur == op->max ) {
    void *p;

    p = realloc( op->poll, sizeof( *op->poll ) * newmax );

    if ( !p ) {
      errno = ENOMEM;
      return( -1 );
    }
    op->poll = p;

    p = realloc( op->ptodat, sizeof( *op->ptodat ) * newmax );
    if ( !p ) {
      op->poll = realloc( op->poll, sizeof( *op->poll ) * op->max ); // shrink the pollset back down
      errno = ENOMEM;
      return( -1 );
    }
    op->ptodat = p;
    op->max = newmax;
  }

  pfd = &op->poll[ op->cur ];
  pfd->fd = ev->ev_fd;
  pfd->revents = 0;
  pfd->events = 0;

  if ( ev->ev_events & EV_READ )
    pfd->events |= POLLIN;
  
  if ( ev->ev_events & EV_WRITE )
    pfd->events |= POLLOUT;
  
  op->ptodat[ op->cur ] = dat;
  dat->poll_position = op->cur;
  op->cur++;

  return( 0 );
}

static void
poll_free(struct rtsigop *op, int n)
{
  if ( op->poll == NULL ) return;

  op->cur--;

  if ( n < op->cur ) {
    memcpy( &op->poll[n], &op->poll[ op->cur ], sizeof( *op->poll ) );
    op->ptodat[ n ] = op->ptodat[ op->cur ];
    op->ptodat[ n ]->poll_position = n;
  }


  // less then half the max in use causes us to shrink
  if ( op->max > INIT_MAX && op->cur < op->max >> 1 ) {
    op->max >>= 1;
    op->poll = realloc( op->poll, sizeof( *op->poll ) * op->max );
    op->ptodat = realloc( op->ptodat, sizeof( *op->ptodat ) * op->max );
  }
}

static void
poll_remove(struct rtsigop *op, struct event *ev, struct rtdata *dat)
{
  int pp;
  if ( dat == NULL )
    dat = ev2dat( op, ev, 0 );

  if ( dat == NULL ) return;

  pp = dat->poll_position;
  if ( pp != -1 ) {
    poll_free( op, pp );
    dat->poll_position = -1;
  }
}

static void
activate(struct event *ev, int flags)
{
  // activate an event, possibly removing one-shot events
  if ( !( ev->ev_events & EV_PERSIST ) ) event_del( ev );
  event_active( ev, flags, 1 );
}

#define FD_CLOSEONEXEC(x) do { \
        if (fcntl(x, F_SETFD, 1) == -1) \
                event_warn("fcntl(%d, F_SETFD)", x); \
} while (0)

void *
rtsig_init(void)
{
  struct rtsigop *op;
  int sockets[2];
  int optarg;
  struct rtdata *dat;
  int flags;

  if ( getenv( "EVENT_NORTSIG" ) )
    goto err;

  op = calloc( 1, sizeof( *op ) );
  if ( op == NULL )
    goto err;

  op->max = INIT_MAX;
  op->poll = malloc( sizeof( *op->poll ) * op->max );
  if ( op->poll == NULL ) 
    goto err_free_op;

  op->signo = find_rt_signal();
  if ( op->signo == 0 ) {
    goto err_free_poll;
  }
  
  op->nonsock = 0;

  if ( socketpair( PF_UNIX, SOCK_DGRAM, 0, sockets ) != 0 ) {
    goto err_free_poll;
  }

  FD_CLOSEONEXEC( sockets[0] );
  FD_CLOSEONEXEC( sockets[1] );

  signal_send_fd[ op->signo - 1 ] = sockets[0];
  trouble[ op->signo - 1 ] = 0;
  op->signal_send_fd = sockets[0];
  op->signal_recv_fd = sockets[1];
  flags = fcntl( op->signal_recv_fd, F_GETFL );
  fcntl( op->signal_recv_fd, F_SETFL, flags | O_NONBLOCK );

  optarg = MAXBUFFERSIZE;
  setsockopt( signal_send_fd[ op->signo - 1 ],
	      SOL_SOCKET,
	      SO_SNDBUF, 
	      &optarg,
	      sizeof( optarg ) );
  
  optarg = MAXBUFFERSIZE;
  setsockopt( op->signal_recv_fd,
	      SOL_SOCKET,
	      SO_RCVBUF,
	      &optarg,
	      sizeof( optarg ) );

  op->highestfd = -1;
  op->fdtodat = NULL;
  if ( grow_fdset( op, 1 ) == -1 ) {
    goto err_close_pair;
  }

  op->ptodat = malloc( sizeof( *op->ptodat ) * op->max );
  if ( op->ptodat == NULL ) {
    goto err_close_pair;
  }

  sigemptyset( &op->sigs );
  sigaddset( &op->sigs, SIGIO );
  sigaddset( &op->sigs, op->signo );
  sigprocmask( SIG_BLOCK, &op->sigs, NULL );
  set_sigaction( SIGIO );
  set_sigaction( op->signo );

  event_set( &( op->sigfdev ), op->signal_recv_fd, EV_READ | EV_PERSIST, donothing, NULL );
  op->sigfdev.ev_flags |= EVLIST_DONTDEL;
  dat = ev2dat( op, &( op->sigfdev ), 1 );
  poll_add( op, &( op->sigfdev ), dat );

  return( op );

 err_close_pair:
    close( op->signal_recv_fd );
    close( signal_send_fd[ op->signo - 1 ] );

 err_free_poll:
    free( op->poll );
 
 err_free_op:
    free( op );
 err:
    return( NULL );
}

int
rtsig_add(void *arg, struct event *ev)
{
  struct rtsigop *op = (struct rtsigop *) arg;
  int flags, i;
  struct stat statbuf;
  struct rtdata *dat;

  if ( ev->ev_events & EV_SIGNAL ) {
    int signo = EVENT_SIGNAL( ev );
  
    sigaddset( &op->sigs, EVENT_SIGNAL( ev ) );
    if ( sigprocmask( SIG_BLOCK, &op->sigs, NULL ) == -1 )
      return( -1 );
    
    set_sigaction( signo );
    
    signal_send_fd[ signo - 1 ] = op->signal_send_fd;
    trouble[ signo - 1 ] = 0;

    return( 0 );
  }

  if ( !( ev->ev_events & ( EV_READ | EV_WRITE ) ) ) 
    return( 0 );

  if ( -1 == fstat( ev->ev_fd, &statbuf ) )
    return( -1 );

  if ( !S_ISSOCK( statbuf.st_mode ) )
    ev->ev_flags |= EVLIST_NONSOCK;

  flags = fcntl( ev->ev_fd, F_GETFL );
  if ( flags == -1 )
    return( -1 );

  if ( !( flags & O_ASYNC ) ) {
    if ( fcntl( ev->ev_fd, F_SETSIG, op->signo ) == -1 ||
	 fcntl( ev->ev_fd, F_SETOWN, (int) gettid() ) == -1 )
      return (-1);
    
    // the overhead of always handling writeable edges isn't going to be that bad...
    if ( fcntl( ev->ev_fd, F_SETFL, flags | O_ASYNC | O_RDWR ) ) 
      return (-1);
  }

#ifdef O_ONESIGFD
  // F_SETAUXFL and O_ONESIGFD are defined in a non-standard linux kernel patch to coalesce events for fds
  fcntl( ev->ev_fd, F_SETAUXFL, O_ONESIGFD );
#endif

  dat = ev2dat( op, ev, 1 );
  if ( dat == NULL ) return( -1 );

  op->total++;
  if ( ev->ev_flags & EVLIST_NONSOCK ) op->nonsock++;

  if ( poll_add( op, ev, dat ) == -1 ) {
    // must check the level of new fd's
    i = errno;
    fcntl( ev->ev_fd, F_SETFL, flags );
    errno = i;
    return( -1 );
  }

  return( 0 );
}

int
rtsig_del(void *arg, struct event *ev)
{
  struct rtdata *dat;
  struct rtsigop *op = (struct rtsigop *) arg;

  if ( ev->ev_events & EV_SIGNAL ) {
    sigset_t sigs;

    sigdelset( &op->sigs, EVENT_SIGNAL( ev ) );
    
    sigemptyset( &sigs );
    sigaddset( &sigs, EVENT_SIGNAL( ev ) );
    return( sigprocmask( SIG_UNBLOCK, &sigs, NULL ) );
  }

  if ( !( ev->ev_events & ( EV_READ | EV_WRITE ) ) )
    return( 0 );

  dat = ev2dat( op, ev, 0 );
  poll_remove( op, ev, dat );
  dat_del( op, dat );
  op->total--;
  if ( ev->ev_flags & EVLIST_NONSOCK ) op->nonsock--;

  return( 0 );
}

int
rtsig_recalc(struct event_base *base, void *arg, int max)
{
  return (0);
}

/* the following do_X functions implement the different stages of a single
   eventloop pass: poll(), recv(sigsock), sigtimedwait()
   
   do_siginfo_dispatch() is a common factor to both do_sigwait() and
   do_signals_from_socket().
*/

static inline int
do_poll( struct rtsigop *op, struct timespec *ts ) {

  int res = 0;
  int i = 0;
  
  if ( op->cur > 1 ) { // non-empty poll set (modulo the signalfd)
    if ( op->nonsock ) {
      int timeout = ts->tv_nsec / 1000000 + ts->tv_sec * 1000;

      sigprocmask( SIG_UNBLOCK, &( op->sigs ), NULL );

      res = poll( op->poll, op->cur, timeout );
     
      sigprocmask( SIG_BLOCK, &( op->sigs ), NULL );

      ts->tv_sec = 0;
      ts->tv_nsec = 0;
    } else {
      res = poll( op->poll, op->cur, 0 );
    }

    if ( res < 0 ) {
      return( errno == EINTR ? 0 : -1 );
    } else if ( res ) {
      ts->tv_sec = 0;
      ts->tv_nsec = 0;
    }

    i = 0;
    while ( i < op->cur ) {
      struct rtdata *dat = op->ptodat[ i ];
      struct event *ev = dat->ev;

      if ( op->poll[ i ].revents ) {
	int flags = 0;
	
	if ( op->poll[ i ].revents & ( POLLIN | POLLERR ) )
	  flags |= EV_READ;
	
	if ( op->poll[ i ].revents & POLLOUT )
	  flags |= EV_WRITE;
	
	if ( !( ev->ev_events & EV_PERSIST ) ) {
	  poll_remove( op, ev, op->ptodat[ i ] );
	  event_del( ev );
	} else {
	  i++;
	}
	
	event_active( ev, flags, 1 );
      } else {
	if ( ev->ev_flags & ( EVLIST_NONSOCK|EVLIST_DONTDEL ) ) {
	  i++;
	} else {
	  poll_remove( op, ev, op->ptodat[ i ] );
	}
      }
    }
  }
  return( res );
}

static inline int
do_siginfo_dispatch( struct event_base *base, struct rtsigop *op, siginfo_t *info ) {
  int signum;
  struct rtdata *dat, *next_dat;
  struct event *ev, *next_ev;

  if ( info == NULL ) return( -1 );

  signum = info->si_signo;
  if ( signum == op->signo ) {
    int flags, sigok = 0;
    flags = 0;

    if ( info->si_band & ( POLLIN | POLLERR ) )  flags |= EV_READ;
    if ( info->si_band & POLLOUT ) flags |= EV_WRITE;

    if ( !flags )
      return( 0 );

    if ( info->si_fd > op->highestfd )
      return( -1 );

    dat = TAILQ_FIRST( op->fdtodat[ info->si_fd ] );
    while ( dat != TAILQ_END( op->fdtodat[ info->si_fd ] ) ) {
      next_dat = TAILQ_NEXT( dat, next );
      if ( flags & dat->ev->ev_events ) {
	ev = dat->ev;
	poll_add( op, ev, dat );
	activate( ev, flags & ev->ev_events );
	sigok = 1;
      }
      dat = next_dat;
    }
  } else if ( signum == SIGIO ) {
    TAILQ_FOREACH( ev, &base->eventqueue, ev_next ) {
      if ( ev->ev_events & ( EV_READ | EV_WRITE ) )
	poll_add( op, ev, NULL );
    }
    return( 1 ); // 1 means the caller should poll() again
    
  } else if ( sigismember( &op->sigs, signum ) ) {
    // managed signals are queued
    ev = TAILQ_FIRST( &signalqueue );
    while( ev != TAILQ_END( &signalqueue ) ) {
      next_ev = TAILQ_NEXT( ev, ev_signal_next );
      if ( EVENT_SIGNAL( ev ) == signum )
	activate( ev, EV_SIGNAL );
      ev = next_ev;
    }
  } else {
    // dispatch unmanaged signals immediately
    struct sigaction sa;
    if ( sigaction( signum, NULL, &sa ) == 0 ) {
      if ( ( sa.sa_flags & SA_SIGINFO ) && sa.sa_sigaction ) {
	(*sa.sa_sigaction)( signum, info, NULL );
      } else if ( sa.sa_handler ) {
	if ( (int)sa.sa_handler != 1 ) {
	  (*sa.sa_handler)( signum );
	}
      } else {
	if ( signum != SIGCHLD ) {
	  kill( gettid(), signum ); // non-blocked SIG_DFL
	}
      }
    }
  }
  return( 0 );
}

static inline int
do_sigwait( struct event_base *base, struct rtsigop *op, struct timespec *ts, sigset_t *sigs ) {
  // return 1 if we should poll again
  // return 0 if we are all set
  // return -1 on error

  for (;;) {
    siginfo_t info;
    int signum;

    signum = sigtimedwait( sigs, &info, ts );

    ts->tv_sec = 0;
    ts->tv_nsec = 0;

    if ( signum == -1 ) {
      if ( errno == EAGAIN )
	return( 0 );
      
      return ( errno == EINTR ? 0 : -1 );

    } else if ( 1 == do_siginfo_dispatch( base, op, &info ) ) {
      return( 1 );
    }
  }
  return( 0 ); // cannot be reached anyways
}

static inline int
do_signals_from_socket( struct event_base *base, struct rtsigop *op, struct timespec *ts ) {
  int fd = op->signal_recv_fd;
  siginfo_t info;
  int res;

  for(;;) {
    res = recv( fd, &info, sizeof( info ), MSG_NOSIGNAL );
    if ( res == -1 ) {
      if ( errno == EAGAIN ) return( 0 );
      if ( errno == EINTR ) continue;
      return( -1 );
    } else {
      ts->tv_sec = 0;
      ts->tv_nsec = 0;
      if ( 1 == do_siginfo_dispatch( base, op, &info ) ) {
	return( 1 );
      }
    }
  }
}

int
rtsig_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{
  struct rtsigop *op = (struct rtsigop *) arg;
  struct timespec ts;
  int res;
  sigset_t sigs;

  ts.tv_sec = tv->tv_sec;
  ts.tv_nsec = tv->tv_usec * 1000;

 poll_for_level:
  res = do_poll( op, &ts ); // ts can be modified in do_XXX()

  res = do_signals_from_socket( base, op, &ts );
  if ( res == 1 )
    goto poll_for_level;
  else if ( res == -1 )
    return( -1 );

  //   the mask = managed_signals | unblocked-signals
  // MM - if this is not blocking do we need to cast the net this wide?
  sigemptyset( &sigs );
  sigprocmask( SIG_BLOCK, &sigs, &sigs );
  signotset( &sigs );
  sigorset( &sigs, &sigs, &op->sigs );

  res = do_sigwait( base, op, &ts, &sigs );

  if ( res == 1 )
    goto poll_for_level;
  else if ( res == -1 )
    return( -1 );

  return( 0 );
}
