On Mon, Jun 08, 2009 at 06:14:31PM +0200, Reyk Floeter wrote: > hi, > > the idea sounds ok, but why just 128? tcpbench is for benchmarking > and testing and it should be possible to run more concurrent > connections. > > it could call getrlimit() to get the actual RLIMIT_NOFILE value which > is 128 by default but can be much higher. another variant is the way > spamd/spamd.c is doing it by looking at KERN_MAXFILES. >
Hmm, I didn't know we had a soft limit, I thought you had to recompile the kernel and tune max files, I agree, I'll add support for expanding the limit. Is there any sane limit ? Or I can push as far as it goes ? I didn't pay much attention to the kvars monitoring yet, are those values per process in the kernel ? Another feature I was thinking would be to dump the output in one file per host. What do you guys think ? Besides correcting fd limit, udp support, any other ideas ? Best regards. > > On Mon, Jun 08, 2009 at 10:44:37AM -0300, Christiano Farina Haesbaert wrote: > > Hi, > > > > The following patch makes tcpbench(1) non-forking and non-blocking, > > I've changed the output to show the file descriptor instead of the > > pid. > > > > The server will no longer die due to excessive forking as it's all > > wraped in a single process, we are limited however to 128 fds. > > > > There is an error when mapping certain kvars with more than one > > connection as there is only one single process. > > I've noticed a small drop in performance when using more than 10+ > > connections, but it's seems irrelevant. > > > > This was suggested by Henning at misc@, > > > > Any feedback is welcome, I'll be working on udp support for now. > > > > Index: tcpbench.c > > =================================================================== > > RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v > > retrieving revision 1.8 > > diff -u -d -r1.8 tcpbench.c > > --- tcpbench.c 18 Sep 2008 10:23:33 -0000 1.8 > > +++ tcpbench.c 8 Jun 2009 13:46:48 -0000 > > @@ -50,14 +50,14 @@ > > #define DEFAULT_PORT "12345" > > #define DEFAULT_STATS_INTERVAL 1000 /* ms */ > > #define DEFAULT_BUF 256 * 1024 > > +#define MAX_FD 128 > > > > sig_atomic_t done = 0; > > -sig_atomic_t print_stats = 0; > > > > struct statctx { > > struct timeval t_start, t_last, t_cur; > > unsigned long long bytes; > > - pid_t pid; > > + int fd; > > u_long tcbaddr; > > kvm_t *kh; > > char **kvars; > > @@ -103,7 +103,6 @@ > > static void > > alarmhandler(int signo) > > { > > - print_stats = 1; > > signal(signo, alarmhandler); > > } > > > > @@ -309,24 +308,22 @@ > > { > > struct itimerval itv; > > int i; > > - > > + > > if (rflag <= 0) > > return; > > + sc->fd = fd; > > sc->kh = kh; > > sc->kvars = kflag; > > + sc->bytes = 0; > > if (kflag) > > sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd, vflag); > > gettimeofday(&sc->t_start, NULL); > > sc->t_last = sc->t_start; > > - signal(SIGALRM, alarmhandler); > > itv.it_interval.tv_sec = rflag / 1000; > > itv.it_interval.tv_usec = (rflag % 1000) * 1000; > > itv.it_value = itv.it_interval; > > - setitimer(ITIMER_REAL, &itv, NULL); > > - sc->bytes = 0; > > - sc->pid = getpid(); > > > > - printf("%8s %12s %14s %12s ", "pid", "elapsed_ms", "bytes", "Mbps"); > > + printf("%8s %12s %14s %12s ", "fd", "elapsed_ms", "bytes", "Mbps"); > > if (sc->kvars != NULL) { > > for (i = 0; sc->kvars[i] != NULL; i++) > > printf("%s%s", i > 0 ? "," : "", sc->kvars[i]); > > @@ -342,7 +339,7 @@ > > } > > > > static void > > -stats_display(struct statctx *sc) > > +stats_display(struct statctx *sc, int rflag) > > { > > struct timeval t_diff; > > unsigned long long total_elapsed, since_last; > > @@ -356,11 +353,13 @@ > > total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000; > > timersub(&sc->t_cur, &sc->t_last, &t_diff); > > since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000; > > - printf("%8ld %12llu %14llu %12.3Lf ", (long)sc->pid, > > + if (since_last <= rflag) > > + return; > > + printf("%8d %12llu %14llu %12.3Lf ", sc->fd, > > total_elapsed, sc->bytes, > > (long double)(sc->bytes * 8) / (since_last * 1000.0)); > > sc->t_last = sc->t_cur; > > - sc->bytes = 0; > > + sc->bytes = 0; > > > > if (sc->kvars != NULL) { > > kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb, &sockb); > > @@ -403,91 +402,31 @@ > > fflush(stdout); > > } > > > > -static void > > -stats_finish(struct statctx *sc) > > -{ > > - struct itimerval itv; > > - > > - signal(SIGALRM, SIG_DFL); > > - bzero(&itv, sizeof(itv)); > > - setitimer(ITIMER_REAL, &itv, NULL); > > -} > > - > > -static void __dead > > -handle_connection(kvm_t *kvmh, u_long ktcbtab, int sock, int vflag, > > - int rflag, char **kflag, int Bflag) > > -{ > > - char *buf; > > - struct pollfd pfd; > > - ssize_t n; > > - int r; > > - struct statctx sc; > > - > > - if ((buf = malloc(Bflag)) == NULL) > > - err(1, "malloc"); > > - if ((r = fcntl(sock, F_GETFL, 0)) == -1) > > - err(1, "fcntl(F_GETFL)"); > > - r |= O_NONBLOCK; > > - if (fcntl(sock, F_SETFL, r) == -1) > > - err(1, "fcntl(F_SETFL, O_NONBLOCK)"); > > - > > - signal(SIGINT, exitsighand); > > - signal(SIGTERM, exitsighand); > > - signal(SIGHUP, exitsighand); > > - signal(SIGPIPE, SIG_IGN); > > - > > - bzero(&pfd, sizeof(pfd)); > > - pfd.fd = sock; > > - pfd.events = POLLIN; > > - > > - stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag); > > - > > - while (!done) { > > - if (print_stats) { > > - stats_display(&sc); > > - print_stats = 0; > > - } > > - if (poll(&pfd, 1, INFTIM) == -1) { > > - if (errno == EINTR) > > - continue; > > - err(1, "poll"); > > - } > > - if ((n = read(pfd.fd, buf, Bflag)) == -1) { > > - if (errno == EINTR || errno == EAGAIN) > > - continue; > > - err(1, "read"); > > - } > > - if (n == 0) { > > - fprintf(stderr, "%8ld closed by remote end\n", > > - (long)getpid()); > > - done = -1; > > - break; > > - } > > - if (vflag >= 3) > > - fprintf(stderr, "read: %zd bytes\n", n); > > - stats_update(&sc, n); > > - } > > - stats_finish(&sc); > > - > > - free(buf); > > - close(sock); > > - exit(1); > > -} > > - > > static void __dead > > serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop, > > int vflag, int rflag, char **kflag, int Sflag, int Bflag) > > { > > + > > + char *buf; > > char tmp[128]; > > - int r, sock, client_id, on = 1; > > + int i, r, sock, client_id, fdval, nextfree, on = 1; > > + size_t listenfds, nfds, lastlfd; > > + socklen_t sslen; > > + ssize_t n; > > struct addrinfo *ai; > > - struct pollfd *pfd; > > + struct pollfd pfd[MAX_FD]; > > struct sockaddr_storage ss; > > - socklen_t sslen; > > - size_t nfds, i, j; > > + struct statctx psc[MAX_FD]; > > > > - pfd = NULL; > > - nfds = 0; > > + bzero(pfd, sizeof(pfd)); > > + bzero(psc, sizeof(pfd)); > > + nfds = listenfds = 0; > > + lastlfd = -1; > > + if ((buf = malloc(Bflag)) == NULL) > > + err(1, "malloc"); > > + for (i = 0; i < MAX_FD; ++i) > > + pfd[i].fd = -1; > > + > > for (ai = aitop; ai != NULL; ai = ai->ai_next) { > > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp)); > > if (vflag) > > @@ -500,9 +439,16 @@ > > warn("socket"); > > continue; > > } > > - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, > > - &on, sizeof(on)) == -1) > > + > > + if ((fdval = fcntl(sock, F_GETFL, 0)) == -1) > > + err(1, "fcntl(F_GETFL)"); > > + fdval |= O_NONBLOCK; > > + if (fcntl(sock, F_SETFL, fdval) == -1) > > + err(1, "fcntl(F_SETFL, O_NONBLOCK)"); > > + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, > > + sizeof(on)) == -1) > > warn("reuse port"); > > + > > if (bind(sock, ai->ai_addr, ai->ai_addrlen) != 0) { > > if (ai->ai_next == NULL) > > err(1, "bind"); > > @@ -524,29 +470,23 @@ > > close(sock); > > continue; > > } > > - if (nfds > 128) > > + if (nfds > MAX_FD - 1) > > break; > > - if ((pfd = realloc(pfd, ++nfds * sizeof(*pfd))) == NULL) > > - errx(1, "realloc(pfd * %zu)", nfds); > > + > > + nfds++; > > + lastlfd = sock; > > pfd[nfds - 1].fd = sock; > > pfd[nfds - 1].events = POLLIN; > > } > > freeaddrinfo(aitop); > > if (nfds == 0) > > errx(1, "No working listen addresses found"); > > - > > - signal(SIGINT, exitsighand); > > - signal(SIGTERM, exitsighand); > > - signal(SIGHUP, exitsighand); > > - signal(SIGPIPE, SIG_IGN); > > - signal(SIGCHLD, SIG_IGN); > > - > > - if (setpgid(0, 0) == -1) > > - err(1, "setpgid"); > > - > > + listenfds = nfds; > > + > > client_id = 0; > > - while (!done) { > > - if ((r = poll(pfd, nfds, INFTIM)) == -1) { > > + while (!done) { > > + fflush(stdout); > > + if ((r = poll(pfd, MAX_FD, INFTIM)) == -1) { > > if (errno == EINTR) > > continue; > > warn("poll"); > > @@ -554,52 +494,82 @@ > > } > > if (vflag >= 3) > > fprintf(stderr, "poll: %d\n", r); > > - for (i = 0 ; r > 0 && i < nfds; i++) { > > - if ((pfd[i].revents & POLLIN) == 0) > > + > > + for (i = 0 ; i < MAX_FD && r > 0; i++) { > > + if (pfd[i].fd == -1 || (pfd[i].revents & POLLIN) == 0) > > continue; > > + r--; > > + > > if (vflag >= 3) > > fprintf(stderr, "fd %d active\n", pfd[i].fd); > > - r--; > > - sslen = sizeof(ss); > > - if ((sock = accept(pfd[i].fd, (struct sockaddr *)&ss, > > - &sslen)) == -1) { > > - if (errno == EINTR) > > + if (pfd[i].fd <= lastlfd) { > > + sslen = sizeof(ss); > > + if ((sock = accept(pfd[i].fd, > > + (struct sockaddr *)&ss, &sslen)) == -1) { > > + if (errno == EINTR) > > + continue; > > + warn("accept"); > > continue; > > - warn("accept"); > > - break; > > + } > > + saddr_ntop((struct sockaddr *)&ss, sslen, > > + tmp, sizeof(tmp)); > > + > > + for (nextfree = 0; nextfree < MAX_FD; > > nextfree++) > > + if (pfd[nextfree].fd == -1) > > + break; > > + /* We shouldn't fall here as accept will fail > > first */ > > + if (nextfree >= MAX_FD) { > > + fprintf(stderr, "maximum number of > > connection reached\n"); > > + close(sock); > > + continue; > > + } > > + > > + nfds++; > > + pfd[nextfree].fd = sock; > > + pfd[nextfree].events = POLLIN; > > + stats_prepare(&psc[(nfds - 1) - listenfds], > > + sock, kvmh, ktcbtab, rflag, vflag, kflag); > > + if (vflag) > > + fprintf(stderr, "Accepted connection %d > > from " > > + "%s, fd = %d\n", client_id++, tmp, > > sock); > > } > > - saddr_ntop((struct sockaddr *)&ss, sslen, > > - tmp, sizeof(tmp)); > > - if (vflag) > > - fprintf(stderr, "Accepted connection %d from " > > - "%s, fd = %d\n", client_id++, tmp, sock); > > - switch (fork()) { > > - case -1: > > - warn("fork"); > > - done = -1; > > - break; > > - case 0: > > - for (j = 0; j < nfds; j++) > > - if (j != i) > > - close(pfd[j].fd); > > - handle_connection(kvmh, ktcbtab, sock, > > - vflag, rflag, kflag, Bflag); > > - /* NOTREACHED */ > > - _exit(1); > > - default: > > - close(sock); > > - break; > > + else { > > + n = read(pfd[i].fd, buf, Bflag); > > + if (n == 0) { > > + fprintf(stderr, > > + "fd %d closed by remote end\n", > > + pfd[i].fd); > > + close(pfd[i].fd); > > + nfds--; > > + pfd[i].fd = -1; > > + } > > + else if (n == -1) { > > + if (errno == EINTR || errno == EAGAIN) > > + continue; > > + warn("fd %d, read error\n", pfd[i].fd); > > + close(pfd[i].fd); > > + nfds--; > > + pfd[i].fd = -1; > > + } > > + else { > > + if (vflag >= 3) > > + fprintf(stderr, > > + "read: %zd bytes from fd: > > %d\n", > > + n, pfd[i].fd); > > + stats_update(&psc[i - listenfds], n); > > + stats_display(&psc[i - listenfds], > > + rflag); > > + } > > } > > if (done == -1) > > break; > > } > > } > > + > > for (i = 0; i < nfds; i++) > > close(pfd[i].fd); > > if (done > 0) > > warnx("Terminated by signal %d", done); > > - signal(SIGTERM, SIG_IGN); > > - killpg(0, SIGTERM); > > exit(1); > > } > > > > @@ -632,7 +602,7 @@ > > else > > errx(1, "c getaddrinfo: %s", > > gai_strerror(herr)); > > } > > - > > + > > for (sock = -1, ai = aitop; ai != NULL; ai = ai->ai_next) { > > saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, > > sizeof(tmp)); > > @@ -681,18 +651,11 @@ > > fprintf(stderr, "%u connections established\n", scnt); > > arc4random_buf(buf, Bflag); > > > > - signal(SIGINT, exitsighand); > > - signal(SIGTERM, exitsighand); > > - signal(SIGHUP, exitsighand); > > - signal(SIGPIPE, SIG_IGN); > > - > > stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag); > > > > while (!done) { > > - if (print_stats) { > > - stats_display(&sc); > > - print_stats = 0; > > - } > > + stats_display(&sc, rflag); > > + > > if (poll(pfd, nconn, INFTIM) == -1) { > > if (errno == EINTR) > > continue; > > @@ -717,7 +680,6 @@ > > } > > } > > } > > - stats_finish(&sc); > > > > if (done > 0) > > warnx("Terminated by signal %d", done); > > @@ -803,6 +765,8 @@ > > if (errstr != NULL) > > errx(1, "number of connections is %s: %s", > > errstr, optarg); > > + if (nconn > MAX_FD) > > + errx(1, "maximum number of connections is %d", > > MAX_FD); > > break; > > case 'h': > > default: > > @@ -842,6 +806,11 @@ > > errx(1, "kvm: no namelist"); > > } else > > drop_gid(); > > + > > + signal(SIGINT, exitsighand); > > + signal(SIGTERM, exitsighand); > > + signal(SIGHUP, exitsighand); > > + signal(SIGPIPE, SIG_IGN); > > > > if (sflag) > > serverloop(kvmh, nl[0].n_value, aitop, vflag, rflag, kflag, > > > > > > > > > > -- > > Christiano Farina HAESBAERT > > Do NOT send me html mail. > > -- Christiano Farina HAESBAERT Do NOT send me html mail.