On Mon, Jun 08, 2009 at 06:14:31PM +0200, Reyk Floeter wrote:
> hi,
> 
> the idea sounds ok, but why just 128?  tcpbench is for benchmarking
> and testing and it should be possible to run more concurrent
> connections.
> 
> it could call getrlimit() to get the actual RLIMIT_NOFILE value which
> is 128 by default but can be much higher.  another variant is the way
> spamd/spamd.c is doing it by looking at KERN_MAXFILES.
> 

Hmm, I didn't know we had a soft limit, I thought you had to recompile
the kernel and tune max files, I agree, I'll add support for expanding
the limit. Is there any sane limit ? Or I can push as far as it goes ?

I didn't pay much attention to the kvars monitoring yet, are those
values per process in the kernel ?

Another feature I was thinking would be to dump the output in one file
per host. What do you guys think ? Besides correcting fd limit, udp
support, any other ideas ?

Best regards.

> 
> On Mon, Jun 08, 2009 at 10:44:37AM -0300, Christiano Farina Haesbaert wrote:
> > Hi,
> > 
> > The following patch makes tcpbench(1) non-forking and non-blocking,
> > I've changed the output to show the file descriptor instead of the
> > pid.
> > 
> > The server will no longer die due to excessive forking as it's all
> > wraped in a single process, we are limited however to 128 fds.
> > 
> > There is an error when mapping certain kvars with more than one
> > connection as there is only one single process.
> > I've noticed a small drop in performance when using more than 10+
> > connections, but it's seems irrelevant.
> > 
> > This was suggested by Henning at misc@,
> > 
> > Any feedback is welcome, I'll be working on udp support for now.
> > 
> > Index: tcpbench.c
> > ===================================================================
> > RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v
> > retrieving revision 1.8
> > diff -u -d -r1.8 tcpbench.c
> > --- tcpbench.c      18 Sep 2008 10:23:33 -0000      1.8
> > +++ tcpbench.c      8 Jun 2009 13:46:48 -0000
> > @@ -50,14 +50,14 @@
> >  #define DEFAULT_PORT               "12345"
> >  #define DEFAULT_STATS_INTERVAL     1000            /* ms */
> >  #define DEFAULT_BUF                256 * 1024
> > +#define MAX_FD                  128
> >  
> >  sig_atomic_t done = 0;
> > -sig_atomic_t print_stats = 0;
> >  
> >  struct statctx {
> >     struct timeval t_start, t_last, t_cur;
> >     unsigned long long bytes;
> > -   pid_t pid;
> > +   int fd;
> >     u_long tcbaddr;
> >     kvm_t *kh;
> >     char **kvars;
> > @@ -103,7 +103,6 @@
> >  static void
> >  alarmhandler(int signo)
> >  {
> > -   print_stats = 1;
> >     signal(signo, alarmhandler);
> >  }
> >  
> > @@ -309,24 +308,22 @@
> >  {
> >     struct itimerval itv;
> >     int i;
> > -
> > +   
> >     if (rflag <= 0)
> >             return;
> > +   sc->fd = fd;
> >     sc->kh = kh;
> >     sc->kvars = kflag;
> > +   sc->bytes = 0;
> >     if (kflag)
> >             sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd, vflag);
> >     gettimeofday(&sc->t_start, NULL);
> >     sc->t_last = sc->t_start;
> > -   signal(SIGALRM, alarmhandler);
> >     itv.it_interval.tv_sec = rflag / 1000;
> >     itv.it_interval.tv_usec = (rflag % 1000) * 1000;
> >     itv.it_value = itv.it_interval;
> > -   setitimer(ITIMER_REAL, &itv, NULL);
> > -   sc->bytes = 0;
> > -   sc->pid = getpid();
> >  
> > -   printf("%8s %12s %14s %12s ", "pid", "elapsed_ms", "bytes", "Mbps");
> > +   printf("%8s %12s %14s %12s ", "fd", "elapsed_ms", "bytes", "Mbps");
> >     if (sc->kvars != NULL) {
> >             for (i = 0; sc->kvars[i] != NULL; i++)
> >                     printf("%s%s", i > 0 ? "," : "", sc->kvars[i]);
> > @@ -342,7 +339,7 @@
> >  }
> >  
> >  static void
> > -stats_display(struct statctx *sc)
> > +stats_display(struct statctx *sc, int rflag)
> >  {
> >     struct timeval t_diff;
> >     unsigned long long total_elapsed, since_last;
> > @@ -356,11 +353,13 @@
> >     total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
> >     timersub(&sc->t_cur, &sc->t_last, &t_diff);
> >     since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
> > -   printf("%8ld %12llu %14llu %12.3Lf ", (long)sc->pid,
> > +   if (since_last <= rflag)
> > +           return;
> > +   printf("%8d %12llu %14llu %12.3Lf ", sc->fd,
> >         total_elapsed, sc->bytes,
> >         (long double)(sc->bytes * 8) / (since_last * 1000.0));
> >     sc->t_last = sc->t_cur;
> > -   sc->bytes = 0;
> > +   sc->bytes = 0; 
> >  
> >     if (sc->kvars != NULL) {
> >             kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb, &sockb);
> > @@ -403,91 +402,31 @@
> >     fflush(stdout);
> >  }
> >  
> > -static void
> > -stats_finish(struct statctx *sc)
> > -{
> > -   struct itimerval itv;
> > -
> > -   signal(SIGALRM, SIG_DFL);
> > -   bzero(&itv, sizeof(itv));
> > -   setitimer(ITIMER_REAL, &itv, NULL);
> > -}
> > -
> > -static void __dead
> > -handle_connection(kvm_t *kvmh, u_long ktcbtab, int sock, int vflag,
> > -    int rflag, char **kflag, int Bflag)
> > -{
> > -   char *buf;
> > -   struct pollfd pfd;
> > -   ssize_t n;
> > -   int r;
> > -   struct statctx sc;
> > -
> > -   if ((buf = malloc(Bflag)) == NULL)
> > -           err(1, "malloc");
> > -   if ((r = fcntl(sock, F_GETFL, 0)) == -1)
> > -           err(1, "fcntl(F_GETFL)");
> > -   r |= O_NONBLOCK;
> > -   if (fcntl(sock, F_SETFL, r) == -1)
> > -           err(1, "fcntl(F_SETFL, O_NONBLOCK)");
> > -
> > -   signal(SIGINT, exitsighand);
> > -   signal(SIGTERM, exitsighand);
> > -   signal(SIGHUP, exitsighand);
> > -   signal(SIGPIPE, SIG_IGN);
> > -
> > -   bzero(&pfd, sizeof(pfd));
> > -   pfd.fd = sock;
> > -   pfd.events = POLLIN;
> > -
> > -   stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
> > -
> > -   while (!done) {
> > -           if (print_stats) {
> > -                   stats_display(&sc);
> > -                   print_stats = 0;
> > -           }
> > -           if (poll(&pfd, 1, INFTIM) == -1) {
> > -                   if (errno == EINTR)
> > -                           continue;
> > -                   err(1, "poll");
> > -           }
> > -           if ((n = read(pfd.fd, buf, Bflag)) == -1) {
> > -                   if (errno == EINTR || errno == EAGAIN)
> > -                           continue;
> > -                   err(1, "read");
> > -           }
> > -           if (n == 0) {
> > -                   fprintf(stderr, "%8ld closed by remote end\n",
> > -                       (long)getpid());
> > -                   done = -1;
> > -                   break;
> > -           }
> > -           if (vflag >= 3)
> > -                   fprintf(stderr, "read: %zd bytes\n", n);
> > -           stats_update(&sc, n);
> > -   }
> > -   stats_finish(&sc);
> > -
> > -   free(buf);
> > -   close(sock);
> > -   exit(1);
> > -}
> > -
> >  static void __dead
> >  serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop,
> >      int vflag, int rflag, char **kflag, int Sflag, int Bflag)
> >  {
> > +   
> > +   char *buf;
> >     char tmp[128];
> > -   int r, sock, client_id, on = 1;
> > +   int i, r, sock, client_id, fdval, nextfree, on = 1;
> > +   size_t listenfds, nfds, lastlfd;
> > +   socklen_t sslen;
> > +   ssize_t n;
> >     struct addrinfo *ai;
> > -   struct pollfd *pfd;
> > +   struct pollfd pfd[MAX_FD];
> >     struct sockaddr_storage ss;
> > -   socklen_t sslen;
> > -   size_t nfds, i, j;
> > +   struct statctx psc[MAX_FD];
> >  
> > -   pfd = NULL;
> > -   nfds = 0;
> > +   bzero(pfd, sizeof(pfd));
> > +   bzero(psc, sizeof(pfd));
> > +   nfds = listenfds = 0;
> > +   lastlfd = -1;
> > +   if ((buf = malloc(Bflag)) == NULL)
> > +           err(1, "malloc");
> > +   for (i = 0; i < MAX_FD; ++i)
> > +           pfd[i].fd = -1;
> > +   
> >     for (ai = aitop; ai != NULL; ai = ai->ai_next) {
> >             saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp));
> >             if (vflag)
> > @@ -500,9 +439,16 @@
> >                             warn("socket");
> >                     continue;
> >             }
> > -           if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
> > -               &on, sizeof(on)) == -1)
> > +
> > +           if ((fdval = fcntl(sock, F_GETFL, 0)) == -1)
> > +                   err(1, "fcntl(F_GETFL)");
> > +           fdval |= O_NONBLOCK;
> > +           if (fcntl(sock, F_SETFL, fdval) == -1)
> > +                   err(1, "fcntl(F_SETFL, O_NONBLOCK)");
> > +           if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on,
> > +                   sizeof(on)) == -1)
> >                     warn("reuse port");
> > +
> >             if (bind(sock, ai->ai_addr, ai->ai_addrlen) != 0) {
> >                     if (ai->ai_next == NULL)
> >                             err(1, "bind");
> > @@ -524,29 +470,23 @@
> >                     close(sock);
> >                     continue;
> >             }
> > -           if (nfds > 128)
> > +           if (nfds > MAX_FD - 1)
> >                     break;
> > -           if ((pfd = realloc(pfd, ++nfds * sizeof(*pfd))) == NULL)
> > -                   errx(1, "realloc(pfd * %zu)", nfds);
> > +
> > +           nfds++;
> > +           lastlfd = sock;
> >             pfd[nfds - 1].fd = sock;
> >             pfd[nfds - 1].events = POLLIN;
> >     }
> >     freeaddrinfo(aitop);
> >     if (nfds == 0)
> >             errx(1, "No working listen addresses found");
> > -
> > -   signal(SIGINT, exitsighand);
> > -   signal(SIGTERM, exitsighand);
> > -   signal(SIGHUP, exitsighand);
> > -   signal(SIGPIPE, SIG_IGN);
> > -   signal(SIGCHLD, SIG_IGN);
> > -
> > -   if (setpgid(0, 0) == -1)
> > -           err(1, "setpgid");
> > -
> > +   listenfds = nfds;
> > +   
> >     client_id = 0;
> > -   while (!done) {         
> > -           if ((r = poll(pfd, nfds, INFTIM)) == -1) {
> > +   while (!done) {
> > +           fflush(stdout);
> > +           if ((r = poll(pfd, MAX_FD, INFTIM)) == -1) {
> >                     if (errno == EINTR)
> >                             continue;
> >                     warn("poll");
> > @@ -554,52 +494,82 @@
> >             }
> >             if (vflag >= 3)
> >                     fprintf(stderr, "poll: %d\n", r);
> > -           for (i = 0 ; r > 0 && i < nfds; i++) {
> > -                   if ((pfd[i].revents & POLLIN) == 0)
> > +
> > +           for (i = 0 ; i < MAX_FD && r > 0; i++) {
> > +                   if (pfd[i].fd == -1 || (pfd[i].revents & POLLIN) == 0)
> >                             continue;
> > +                   r--;
> > +                           
> >                     if (vflag >= 3)
> >                             fprintf(stderr, "fd %d active\n", pfd[i].fd);
> > -                   r--;
> > -                   sslen = sizeof(ss);
> > -                   if ((sock = accept(pfd[i].fd, (struct sockaddr *)&ss,
> > -                       &sslen)) == -1) {
> > -                           if (errno == EINTR)
> > +                   if (pfd[i].fd <= lastlfd) {
> > +                           sslen = sizeof(ss);
> > +                           if ((sock = accept(pfd[i].fd,
> > +                               (struct sockaddr *)&ss, &sslen)) == -1) {
> > +                                   if (errno == EINTR)
> > +                                           continue;
> > +                                   warn("accept");
> >                                     continue;
> > -                           warn("accept");
> > -                           break;
> > +                           }
> > +                           saddr_ntop((struct sockaddr *)&ss, sslen,
> > +                               tmp, sizeof(tmp));
> > +
> > +                           for (nextfree = 0; nextfree < MAX_FD; 
> > nextfree++) 
> > +                                   if (pfd[nextfree].fd == -1)
> > +                                           break;
> > +                           /* We shouldn't fall here as accept will fail 
> > first */
> > +                           if (nextfree >= MAX_FD) {
> > +                                   fprintf(stderr, "maximum number of 
> > connection reached\n");
> > +                                   close(sock);
> > +                                   continue;
> > +                           }
> > +
> > +                           nfds++;
> > +                           pfd[nextfree].fd = sock;
> > +                           pfd[nextfree].events = POLLIN;
> > +                           stats_prepare(&psc[(nfds - 1) - listenfds],
> > +                               sock, kvmh, ktcbtab, rflag, vflag, kflag);
> > +                           if (vflag)
> > +                                   fprintf(stderr, "Accepted connection %d 
> > from "
> > +                                       "%s, fd = %d\n", client_id++, tmp, 
> > sock);
> >                     }
> > -                   saddr_ntop((struct sockaddr *)&ss, sslen,
> > -                       tmp, sizeof(tmp));
> > -                   if (vflag)
> > -                           fprintf(stderr, "Accepted connection %d from "
> > -                               "%s, fd = %d\n", client_id++, tmp, sock);
> > -                   switch (fork()) {
> > -                   case -1:
> > -                           warn("fork");
> > -                           done = -1;
> > -                           break;
> > -                   case 0:
> > -                           for (j = 0; j < nfds; j++)
> > -                                   if (j != i)
> > -                                           close(pfd[j].fd);
> > -                           handle_connection(kvmh, ktcbtab, sock,
> > -                               vflag, rflag, kflag, Bflag);
> > -                           /* NOTREACHED */
> > -                           _exit(1);
> > -                   default:
> > -                           close(sock);
> > -                           break;
> > +                   else {
> > +                           n = read(pfd[i].fd, buf, Bflag);
> > +                           if (n == 0) {
> > +                                   fprintf(stderr,
> > +                                       "fd %d closed by remote end\n",
> > +                                       pfd[i].fd);
> > +                                   close(pfd[i].fd);
> > +                                   nfds--;
> > +                                   pfd[i].fd = -1;
> > +                           }
> > +                           else if (n == -1) {
> > +                                   if (errno == EINTR || errno == EAGAIN)
> > +                                           continue;
> > +                                   warn("fd %d, read error\n", pfd[i].fd);
> > +                                   close(pfd[i].fd);
> > +                                   nfds--;
> > +                                   pfd[i].fd = -1;
> > +                           }
> > +                           else {
> > +                                   if (vflag >= 3)
> > +                                           fprintf(stderr,
> > +                                               "read: %zd bytes from fd: 
> > %d\n",
> > +                                               n, pfd[i].fd);
> > +                                   stats_update(&psc[i - listenfds], n);
> > +                                   stats_display(&psc[i - listenfds],
> > +                                       rflag);
> > +                           }
> >                     }
> >                     if (done == -1)
> >                             break;
> >             }
> >     }
> > +
> >     for (i = 0; i < nfds; i++)
> >             close(pfd[i].fd);
> >     if (done > 0)
> >             warnx("Terminated by signal %d", done);
> > -   signal(SIGTERM, SIG_IGN);
> > -   killpg(0, SIGTERM);
> >     exit(1);
> >  }
> >  
> > @@ -632,7 +602,7 @@
> >                     else
> >                             errx(1, "c getaddrinfo: %s", 
> > gai_strerror(herr));
> >             }
> > -
> > +           
> >             for (sock = -1, ai = aitop; ai != NULL; ai = ai->ai_next) {
> >                     saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp,
> >                         sizeof(tmp));
> > @@ -681,18 +651,11 @@
> >             fprintf(stderr, "%u connections established\n", scnt);
> >     arc4random_buf(buf, Bflag);
> >  
> > -   signal(SIGINT, exitsighand);
> > -   signal(SIGTERM, exitsighand);
> > -   signal(SIGHUP, exitsighand);
> > -   signal(SIGPIPE, SIG_IGN);
> > -
> >     stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
> >  
> >     while (!done) {
> > -           if (print_stats) {
> > -                   stats_display(&sc);
> > -                   print_stats = 0;
> > -           }
> > +           stats_display(&sc, rflag);
> > +           
> >             if (poll(pfd, nconn, INFTIM) == -1) {
> >                     if (errno == EINTR)
> >                             continue;
> > @@ -717,7 +680,6 @@
> >                     }
> >             }
> >     }
> > -   stats_finish(&sc);
> >  
> >     if (done > 0)
> >             warnx("Terminated by signal %d", done);
> > @@ -803,6 +765,8 @@
> >                     if (errstr != NULL)
> >                             errx(1, "number of connections is %s: %s",
> >                                 errstr, optarg);
> > +                   if (nconn > MAX_FD)
> > +                           errx(1, "maximum number of connections is %d", 
> > MAX_FD);
> >                     break;
> >             case 'h':
> >             default:
> > @@ -842,6 +806,11 @@
> >                     errx(1, "kvm: no namelist");
> >     } else
> >             drop_gid();
> > +
> > +   signal(SIGINT, exitsighand);
> > +   signal(SIGTERM, exitsighand);
> > +   signal(SIGHUP, exitsighand);
> > +   signal(SIGPIPE, SIG_IGN);
> >  
> >     if (sflag)
> >             serverloop(kvmh, nl[0].n_value, aitop, vflag, rflag, kflag,
> > 
> > 
> > 
> > 
> > -- 
> > Christiano Farina HAESBAERT
> > Do NOT send me html mail.
> > 

-- 
Christiano Farina HAESBAERT
Do NOT send me html mail.

Reply via email to