hi, the idea sounds ok, but why just 128? tcpbench is for benchmarking and testing and it should be possible to run more concurrent connections.
it could call getrlimit() to get the actual RLIMIT_NOFILE value which is 128 by default but can be much higher. another variant is the way spamd/spamd.c is doing it by looking at KERN_MAXFILES. reyk On Mon, Jun 08, 2009 at 10:44:37AM -0300, Christiano Farina Haesbaert wrote: > Hi, > > The following patch makes tcpbench(1) non-forking and non-blocking, > I've changed the output to show the file descriptor instead of the > pid. > > The server will no longer die due to excessive forking as it's all > wraped in a single process, we are limited however to 128 fds. > > There is an error when mapping certain kvars with more than one > connection as there is only one single process. > I've noticed a small drop in performance when using more than 10+ > connections, but it's seems irrelevant. > > This was suggested by Henning at misc@, > > Any feedback is welcome, I'll be working on udp support for now. > > Index: tcpbench.c > =================================================================== > RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v > retrieving revision 1.8 > diff -u -d -r1.8 tcpbench.c > --- tcpbench.c 18 Sep 2008 10:23:33 -0000 1.8 > +++ tcpbench.c 8 Jun 2009 13:46:48 -0000 > @@ -50,14 +50,14 @@ > #define DEFAULT_PORT "12345" > #define DEFAULT_STATS_INTERVAL 1000 /* ms */ > #define DEFAULT_BUF 256 * 1024 > +#define MAX_FD 128 > > sig_atomic_t done = 0; > -sig_atomic_t print_stats = 0; > > struct statctx { > struct timeval t_start, t_last, t_cur; > unsigned long long bytes; > - pid_t pid; > + int fd; > u_long tcbaddr; > kvm_t *kh; > char **kvars; > @@ -103,7 +103,6 @@ > static void > alarmhandler(int signo) > { > - print_stats = 1; > signal(signo, alarmhandler); > } > > @@ -309,24 +308,22 @@ > { > struct itimerval itv; > int i; > - > + > if (rflag <= 0) > return; > + sc->fd = fd; > sc->kh = kh; > sc->kvars = kflag; > + sc->bytes = 0; > if (kflag) > sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd, vflag); > gettimeofday(&sc->t_start, NULL); > sc->t_last = sc->t_start; > - signal(SIGALRM, alarmhandler); > itv.it_interval.tv_sec = rflag / 1000; > itv.it_interval.tv_usec = (rflag % 1000) * 1000; > itv.it_value = itv.it_interval; > - setitimer(ITIMER_REAL, &itv, NULL); > - sc->bytes = 0; > - sc->pid = getpid(); > > - printf("%8s %12s %14s %12s ", "pid", "elapsed_ms", "bytes", "Mbps"); > + printf("%8s %12s %14s %12s ", "fd", "elapsed_ms", "bytes", "Mbps"); > if (sc->kvars != NULL) { > for (i = 0; sc->kvars[i] != NULL; i++) > printf("%s%s", i > 0 ? "," : "", sc->kvars[i]); > @@ -342,7 +339,7 @@ > } > > static void > -stats_display(struct statctx *sc) > +stats_display(struct statctx *sc, int rflag) > { > struct timeval t_diff; > unsigned long long total_elapsed, since_last; > @@ -356,11 +353,13 @@ > total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000; > timersub(&sc->t_cur, &sc->t_last, &t_diff); > since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000; > - printf("%8ld %12llu %14llu %12.3Lf ", (long)sc->pid, > + if (since_last <= rflag) > + return; > + printf("%8d %12llu %14llu %12.3Lf ", sc->fd, > total_elapsed, sc->bytes, > (long double)(sc->bytes * 8) / (since_last * 1000.0)); > sc->t_last = sc->t_cur; > - sc->bytes = 0; > + sc->bytes = 0; > > if (sc->kvars != NULL) { > kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb, &sockb); > @@ -403,91 +402,31 @@ > fflush(stdout); > } > > -static void > -stats_finish(struct statctx *sc) > -{ > - struct itimerval itv; > - > - signal(SIGALRM, SIG_DFL); > - bzero(&itv, sizeof(itv)); > - setitimer(ITIMER_REAL, &itv, NULL); > -} > - > -static void __dead > -handle_connection(kvm_t *kvmh, u_long ktcbtab, int sock, int vflag, > - int rflag, char **kflag, int Bflag) > -{ > - char *buf; > - struct pollfd pfd; > - ssize_t n; > - int r; > - struct statctx sc; > - > - if ((buf = malloc(Bflag)) == NULL) > - err(1, "malloc"); > - if ((r = fcntl(sock, F_GETFL, 0)) == -1) > - err(1, "fcntl(F_GETFL)"); > - r |= O_NONBLOCK; > - if (fcntl(sock, F_SETFL, r) == -1) > - err(1, "fcntl(F_SETFL, O_NONBLOCK)"); > - > - signal(SIGINT, exitsighand); > - signal(SIGTERM, exitsighand); > - signal(SIGHUP, exitsighand); > - signal(SIGPIPE, SIG_IGN); > - > - bzero(&pfd, sizeof(pfd)); > - pfd.fd = sock; > - pfd.events = POLLIN; > - > - stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag); > - > - while (!done) { > - if (print_stats) { > - stats_display(&sc); > - print_stats = 0; > - } > - if (poll(&pfd, 1, INFTIM) == -1) { > - if (errno == EINTR) > - continue; > - err(1, "poll"); > - } > - if ((n = read(pfd.fd, buf, Bflag)) == -1) { > - if (errno == EINTR || errno == EAGAIN) > - continue; > - err(1, "read"); > - } > - if (n == 0) { > - fprintf(stderr, "%8ld closed by remote end\n", > - (long)getpid()); > - done = -1; > - break; > - } > - if (vflag >= 3) > - fprintf(stderr, "read: %zd bytes\n", n); > - stats_update(&sc, n); > - } > - stats_finish(&sc); > - > - free(buf); > - close(sock); > - exit(1); > -} > - > static void __dead > serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop, > int vflag, int rflag, char **kflag, int Sflag, int Bflag) > { > + > + char *buf; > char tmp[128]; > - int r, sock, client_id, on = 1; > + int i, r, sock, client_id, fdval, nextfree, on = 1; > + size_t listenfds, nfds, lastlfd; > + socklen_t sslen; > + ssize_t n; > struct addrinfo *ai; > - struct pollfd *pfd; > + struct pollfd pfd[MAX_FD]; > struct sockaddr_storage ss; > - socklen_t sslen; > - size_t nfds, i, j; > + struct statctx psc[MAX_FD]; > > - pfd = NULL; > - nfds = 0; > + bzero(pfd, sizeof(pfd)); > + bzero(psc, sizeof(pfd)); > + nfds = listenfds = 0; > + lastlfd = -1; > + if ((buf = malloc(Bflag)) == NULL) > + err(1, "malloc"); > + for (i = 0; i < MAX_FD; ++i) > + pfd[i].fd = -1; > + > for (ai = aitop; ai != NULL; ai = ai->ai_next) { > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp)); > if (vflag) > @@ -500,9 +439,16 @@ > warn("socket"); > continue; > } > - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, > - &on, sizeof(on)) == -1) > + > + if ((fdval = fcntl(sock, F_GETFL, 0)) == -1) > + err(1, "fcntl(F_GETFL)"); > + fdval |= O_NONBLOCK; > + if (fcntl(sock, F_SETFL, fdval) == -1) > + err(1, "fcntl(F_SETFL, O_NONBLOCK)"); > + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, > + sizeof(on)) == -1) > warn("reuse port"); > + > if (bind(sock, ai->ai_addr, ai->ai_addrlen) != 0) { > if (ai->ai_next == NULL) > err(1, "bind"); > @@ -524,29 +470,23 @@ > close(sock); > continue; > } > - if (nfds > 128) > + if (nfds > MAX_FD - 1) > break; > - if ((pfd = realloc(pfd, ++nfds * sizeof(*pfd))) == NULL) > - errx(1, "realloc(pfd * %zu)", nfds); > + > + nfds++; > + lastlfd = sock; > pfd[nfds - 1].fd = sock; > pfd[nfds - 1].events = POLLIN; > } > freeaddrinfo(aitop); > if (nfds == 0) > errx(1, "No working listen addresses found"); > - > - signal(SIGINT, exitsighand); > - signal(SIGTERM, exitsighand); > - signal(SIGHUP, exitsighand); > - signal(SIGPIPE, SIG_IGN); > - signal(SIGCHLD, SIG_IGN); > - > - if (setpgid(0, 0) == -1) > - err(1, "setpgid"); > - > + listenfds = nfds; > + > client_id = 0; > - while (!done) { > - if ((r = poll(pfd, nfds, INFTIM)) == -1) { > + while (!done) { > + fflush(stdout); > + if ((r = poll(pfd, MAX_FD, INFTIM)) == -1) { > if (errno == EINTR) > continue; > warn("poll"); > @@ -554,52 +494,82 @@ > } > if (vflag >= 3) > fprintf(stderr, "poll: %d\n", r); > - for (i = 0 ; r > 0 && i < nfds; i++) { > - if ((pfd[i].revents & POLLIN) == 0) > + > + for (i = 0 ; i < MAX_FD && r > 0; i++) { > + if (pfd[i].fd == -1 || (pfd[i].revents & POLLIN) == 0) > continue; > + r--; > + > if (vflag >= 3) > fprintf(stderr, "fd %d active\n", pfd[i].fd); > - r--; > - sslen = sizeof(ss); > - if ((sock = accept(pfd[i].fd, (struct sockaddr *)&ss, > - &sslen)) == -1) { > - if (errno == EINTR) > + if (pfd[i].fd <= lastlfd) { > + sslen = sizeof(ss); > + if ((sock = accept(pfd[i].fd, > + (struct sockaddr *)&ss, &sslen)) == -1) { > + if (errno == EINTR) > + continue; > + warn("accept"); > continue; > - warn("accept"); > - break; > + } > + saddr_ntop((struct sockaddr *)&ss, sslen, > + tmp, sizeof(tmp)); > + > + for (nextfree = 0; nextfree < MAX_FD; > nextfree++) > + if (pfd[nextfree].fd == -1) > + break; > + /* We shouldn't fall here as accept will fail > first */ > + if (nextfree >= MAX_FD) { > + fprintf(stderr, "maximum number of > connection reached\n"); > + close(sock); > + continue; > + } > + > + nfds++; > + pfd[nextfree].fd = sock; > + pfd[nextfree].events = POLLIN; > + stats_prepare(&psc[(nfds - 1) - listenfds], > + sock, kvmh, ktcbtab, rflag, vflag, kflag); > + if (vflag) > + fprintf(stderr, "Accepted connection %d > from " > + "%s, fd = %d\n", client_id++, tmp, > sock); > } > - saddr_ntop((struct sockaddr *)&ss, sslen, > - tmp, sizeof(tmp)); > - if (vflag) > - fprintf(stderr, "Accepted connection %d from " > - "%s, fd = %d\n", client_id++, tmp, sock); > - switch (fork()) { > - case -1: > - warn("fork"); > - done = -1; > - break; > - case 0: > - for (j = 0; j < nfds; j++) > - if (j != i) > - close(pfd[j].fd); > - handle_connection(kvmh, ktcbtab, sock, > - vflag, rflag, kflag, Bflag); > - /* NOTREACHED */ > - _exit(1); > - default: > - close(sock); > - break; > + else { > + n = read(pfd[i].fd, buf, Bflag); > + if (n == 0) { > + fprintf(stderr, > + "fd %d closed by remote end\n", > + pfd[i].fd); > + close(pfd[i].fd); > + nfds--; > + pfd[i].fd = -1; > + } > + else if (n == -1) { > + if (errno == EINTR || errno == EAGAIN) > + continue; > + warn("fd %d, read error\n", pfd[i].fd); > + close(pfd[i].fd); > + nfds--; > + pfd[i].fd = -1; > + } > + else { > + if (vflag >= 3) > + fprintf(stderr, > + "read: %zd bytes from fd: > %d\n", > + n, pfd[i].fd); > + stats_update(&psc[i - listenfds], n); > + stats_display(&psc[i - listenfds], > + rflag); > + } > } > if (done == -1) > break; > } > } > + > for (i = 0; i < nfds; i++) > close(pfd[i].fd); > if (done > 0) > warnx("Terminated by signal %d", done); > - signal(SIGTERM, SIG_IGN); > - killpg(0, SIGTERM); > exit(1); > } > > @@ -632,7 +602,7 @@ > else > errx(1, "c getaddrinfo: %s", > gai_strerror(herr)); > } > - > + > for (sock = -1, ai = aitop; ai != NULL; ai = ai->ai_next) { > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, > sizeof(tmp)); > @@ -681,18 +651,11 @@ > fprintf(stderr, "%u connections established\n", scnt); > arc4random_buf(buf, Bflag); > > - signal(SIGINT, exitsighand); > - signal(SIGTERM, exitsighand); > - signal(SIGHUP, exitsighand); > - signal(SIGPIPE, SIG_IGN); > - > stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag); > > while (!done) { > - if (print_stats) { > - stats_display(&sc); > - print_stats = 0; > - } > + stats_display(&sc, rflag); > + > if (poll(pfd, nconn, INFTIM) == -1) { > if (errno == EINTR) > continue; > @@ -717,7 +680,6 @@ > } > } > } > - stats_finish(&sc); > > if (done > 0) > warnx("Terminated by signal %d", done); > @@ -803,6 +765,8 @@ > if (errstr != NULL) > errx(1, "number of connections is %s: %s", > errstr, optarg); > + if (nconn > MAX_FD) > + errx(1, "maximum number of connections is %d", > MAX_FD); > break; > case 'h': > default: > @@ -842,6 +806,11 @@ > errx(1, "kvm: no namelist"); > } else > drop_gid(); > + > + signal(SIGINT, exitsighand); > + signal(SIGTERM, exitsighand); > + signal(SIGHUP, exitsighand); > + signal(SIGPIPE, SIG_IGN); > > if (sflag) > serverloop(kvmh, nl[0].n_value, aitop, vflag, rflag, kflag, > > > > > -- > Christiano Farina HAESBAERT > Do NOT send me html mail.